This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1084136 [CARBONDATA-3517] Add Segment with Path
1084136 is described below
commit 108413610f96bfb16c2036eb2a35a71080c42594
Author: ravipesala <[email protected]>
AuthorDate: Tue Sep 10 15:42:09 2019 +0530
[CARBONDATA-3517] Add Segment with Path
This closes #3381
---
.../apache/carbondata/core/datamap/Segment.java | 42 ++++
.../block/SegmentPropertiesAndSchemaHolder.java | 37 ++-
.../indexstore/blockletindex/BlockDataMap.java | 7 +-
.../carbondata/core/metadata/SegmentFileStore.java | 116 ++++++++-
.../carbondata/core/statusmanager/FileFormat.java | 42 +++-
.../core/statusmanager/LoadMetadataDetails.java | 19 +-
.../statusmanager/SegmentUpdateStatusManager.java | 39 +--
.../apache/carbondata/core/util/CarbonUtil.java | 24 +-
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 116 ++++++++-
.../testsuite/addsegment/AddSegmentTestCase.scala | 280 +++++++++++++++++++++
.../org/apache/carbondata/api/CarbonStore.scala | 2 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 11 +-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 +
.../spark/rdd/CarbonDataRDDFactory.scala | 10 +-
.../command/management/CarbonAddLoadCommand.scala | 205 +++++++++++++++
.../command/mutation/DeleteExecution.scala | 17 +-
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 12 +-
.../loading/CarbonDataLoadConfiguration.java | 10 +
.../processing/loading/DataLoadProcessBuilder.java | 11 +
.../store/CarbonFactDataHandlerModel.java | 10 +-
.../streaming/segment/StreamSegment.java | 2 +-
21 files changed, 931 insertions(+), 82 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index ad80182..a235f0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -69,6 +69,21 @@ public class Segment implements Serializable, Writable {
private long indexSize = 0;
+ /**
+ * Whether to cache the segment data maps in executors or not.
+ */
+ private boolean isCacheable = true;
+
+ /**
+ * Path of segment where it exists
+ */
+ private transient String segmentPath;
+
+ /**
+ * Properties of the segment.
+ */
+ private transient Map<String, String> options;
+
public Segment() {
}
@@ -101,6 +116,13 @@ public class Segment implements Serializable, Writable {
}
}
+ public Segment(String segmentNo, String segmentFileName, String segmentPath,
+ Map<String, String> options) {
+ this(segmentNo, segmentFileName);
+ this.segmentPath = segmentPath;
+ this.options = options;
+ }
+
/**
*
* @param segmentNo
@@ -282,6 +304,26 @@ public class Segment implements Serializable, Writable {
this.indexSize = indexSize;
}
+ public boolean isCacheable() {
+ return isCacheable;
+ }
+
+ public void setCacheable(boolean cacheable) {
+ isCacheable = cacheable;
+ }
+
+ public String getSegmentPath() {
+ return segmentPath;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+
@Override public void write(DataOutput out) throws IOException {
out.writeUTF(segmentNo);
boolean writeSegmentFileName = segmentFileName != null;
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index 056a0e7..00b624f 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -295,6 +295,8 @@ public class SegmentPropertiesAndSchemaHolder {
// of maintaining 2 variables
private CarbonRowSchema[] taskSummarySchemaForBlock;
private CarbonRowSchema[] taskSummarySchemaForBlocklet;
+ private CarbonRowSchema[] taskSummarySchemaForBlockWithOutFilePath;
+ private CarbonRowSchema[] taskSummarySchemaForBlockletWithOutFilePath;
private CarbonRowSchema[] fileFooterEntrySchemaForBlock;
private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet;
@@ -325,6 +327,8 @@ public class SegmentPropertiesAndSchemaHolder {
taskSummarySchemaForBlock = null;
taskSummarySchemaForBlocklet = null;
+ taskSummarySchemaForBlockWithOutFilePath = null;
+ taskSummarySchemaForBlockletWithOutFilePath = null;
fileFooterEntrySchemaForBlock = null;
fileFooterEntrySchemaForBlocklet = null;
}
@@ -393,7 +397,7 @@ public class SegmentPropertiesAndSchemaHolder {
public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean
storeBlockletCount,
boolean filePathToBeStored) throws MemoryException {
- if (null == taskSummarySchemaForBlock) {
+ if (null == taskSummarySchemaForBlock && filePathToBeStored) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlock) {
taskSummarySchemaForBlock = SchemaGenerator
@@ -401,13 +405,26 @@ public class SegmentPropertiesAndSchemaHolder {
storeBlockletCount, filePathToBeStored);
}
}
+ } else if (null == taskSummarySchemaForBlockWithOutFilePath &&
!filePathToBeStored) {
+ synchronized (taskSchemaLock) {
+ if (null == taskSummarySchemaForBlockWithOutFilePath) {
+ taskSummarySchemaForBlockWithOutFilePath = SchemaGenerator
+ .createTaskSummarySchema(segmentProperties,
getMinMaxCacheColumns(),
+ storeBlockletCount, filePathToBeStored);
+ }
+ }
+ }
+ if (filePathToBeStored) {
+ return taskSummarySchemaForBlock;
+ } else {
+ return taskSummarySchemaForBlockWithOutFilePath;
}
- return taskSummarySchemaForBlock;
+
}
public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean
storeBlockletCount,
boolean filePathToBeStored) throws MemoryException {
- if (null == taskSummarySchemaForBlocklet) {
+ if (null == taskSummarySchemaForBlocklet && filePathToBeStored) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlocklet) {
taskSummarySchemaForBlocklet = SchemaGenerator
@@ -415,8 +432,20 @@ public class SegmentPropertiesAndSchemaHolder {
storeBlockletCount, filePathToBeStored);
}
}
+ } else if (null == taskSummarySchemaForBlockletWithOutFilePath &&
!filePathToBeStored) {
+ synchronized (taskSchemaLock) {
+ if (null == taskSummarySchemaForBlockletWithOutFilePath) {
+ taskSummarySchemaForBlockletWithOutFilePath = SchemaGenerator
+ .createTaskSummarySchema(segmentProperties,
getMinMaxCacheColumns(),
+ storeBlockletCount, filePathToBeStored);
+ }
+ }
+ }
+ if (filePathToBeStored) {
+ return taskSummarySchemaForBlocklet;
+ } else {
+ return taskSummarySchemaForBlockletWithOutFilePath;
}
- return taskSummarySchemaForBlocklet;
}
public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index f168761..0169c12 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -133,8 +133,11 @@ public class BlockDataMap extends CoarseGrainDataMap
// structure
byte[] filePath = null;
boolean isPartitionTable =
blockletDataMapInfo.getCarbonTable().isHivePartitionTable();
- if (isPartitionTable ||
!blockletDataMapInfo.getCarbonTable().isTransactionalTable()
- || blockletDataMapInfo.getCarbonTable().isSupportFlatFolder()) {
+ if (isPartitionTable ||
!blockletDataMapInfo.getCarbonTable().isTransactionalTable() ||
+ blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() ||
+ // if the segment data is written in tablepath then no need to store
whole path of file.
+ !blockletDataMapInfo.getFilePath().startsWith(
+ blockletDataMapInfo.getCarbonTable().getTablePath())) {
filePath =
path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
isFilePathStored = true;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1c2d50d..544de16 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -171,6 +171,66 @@ public class SegmentFileStore {
}
/**
+ * Write segment file to the metadata folder of the table
+ *
+ * @param carbonTable CarbonTable
+ * @param segmentId segment id
+ * @param UUID a UUID string used to construct the segment file name
+ * @return segment file name
+ */
+ public static String writeSegmentFile(CarbonTable carbonTable, String
segmentId, String UUID,
+ String segPath)
+ throws IOException {
+ return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath);
+ }
+
+
+ /**
+ * Write segment file to the metadata folder of the table.
+ *
+ * @param carbonTable CarbonTable
+ * @param segment segment
+ * @return boolean , whether write is success or fail.
+ */
+ public static boolean writeSegmentFile(CarbonTable carbonTable, Segment
segment)
+ throws IOException {
+ String tablePath = carbonTable.getTablePath();
+ CarbonFile segmentFolder =
FileFactory.getCarbonFile(segment.getSegmentPath());
+ CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
file.getName()
+ .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+ }
+ });
+ if (indexFiles != null && indexFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ segmentFile.setOptions(segment.getOptions());
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ folderDetails.setRelative(false);
+ segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+ for (CarbonFile file : indexFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ String segmentFileFolder =
CarbonTablePath.getSegmentFilesLocation(tablePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(segmentFileFolder);
+ }
+ // write segment info to new file.
+ writeSegmentFile(segmentFile,
+ segmentFileFolder + File.separator + segment.getSegmentFileName());
+
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Write segment file to the metadata folder of the table selecting only the
current load files
*
* @param carbonTable
@@ -181,10 +241,13 @@ public class SegmentFileStore {
* @throws IOException
*/
public static String writeSegmentFile(CarbonTable carbonTable, String
segmentId, String UUID,
- final String currentLoadTimeStamp) throws IOException {
+ final String currentLoadTimeStamp, String absSegPath) throws IOException
{
String tablePath = carbonTable.getTablePath();
boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
- String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+ String segmentPath = absSegPath;
+ if (absSegPath == null) {
+ segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+ }
CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
@@ -200,7 +263,7 @@ public class SegmentFileStore {
if (indexFiles != null && indexFiles.length > 0) {
SegmentFile segmentFile = new SegmentFile();
FolderDetails folderDetails = new FolderDetails();
- folderDetails.setRelative(true);
+ folderDetails.setRelative(absSegPath == null);
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
for (CarbonFile file : indexFiles) {
if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -211,7 +274,11 @@ public class SegmentFileStore {
}
String segmentRelativePath = "/";
if (!supportFlatFolder) {
- segmentRelativePath = segmentPath.substring(tablePath.length(),
segmentPath.length());
+ if (absSegPath != null) {
+ segmentRelativePath = absSegPath;
+ } else {
+ segmentRelativePath = segmentPath.substring(tablePath.length());
+ }
}
segmentFile.addPath(segmentRelativePath, folderDetails);
String segmentFileFolder =
CarbonTablePath.getSegmentFilesLocation(tablePath);
@@ -233,7 +300,6 @@ public class SegmentFileStore {
return null;
}
-
/**
* Move the loaded data from source folder to destination folder.
*/
@@ -326,6 +392,18 @@ public class SegmentFileStore {
*/
public static boolean updateSegmentFile(CarbonTable carbonTable, String
segmentId,
String segmentFile, String tableId, SegmentFileStore segmentFileStore)
throws IOException {
+ return updateSegmentFile(carbonTable, segmentId, segmentFile, tableId,
segmentFileStore, null);
+ }
+
+ /**
+ * This API will update the segmentFile of a passed segment.
+ *
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean updateSegmentFile(CarbonTable carbonTable, String
segmentId,
+ String segmentFile, String tableId, SegmentFileStore segmentFileStore,
+ SegmentStatus segmentStatus) throws IOException {
boolean status = false;
String tablePath = carbonTable.getTablePath();
String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
@@ -353,8 +431,20 @@ public class SegmentFileStore {
// if the segments is in the list of marked for delete then update
the status.
if (segmentId.equals(detail.getLoadName())) {
detail.setSegmentFile(segmentFile);
- detail.setIndexSize(String.valueOf(CarbonUtil
- .getCarbonIndexSize(segmentFileStore,
segmentFileStore.getLocationMap())));
+ if (segmentStatus != null) {
+ HashMap<String, Long> dataSizeAndIndexSize =
+ CarbonUtil.getDataSizeAndIndexSize(segmentFileStore);
+ detail.setDataSize(
+
dataSizeAndIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE)
+ .toString());
+ detail.setIndexSize(
+
dataSizeAndIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE)
+ .toString());
+ detail.setSegmentStatus(segmentStatus);
+ } else {
+ detail.setIndexSize(String.valueOf(CarbonUtil
+ .getCarbonIndexSize(segmentFileStore,
segmentFileStore.getLocationMap())));
+ }
break;
}
}
@@ -1051,6 +1141,11 @@ public class SegmentFileStore {
*/
private Map<String, FolderDetails> locationMap;
+ /**
+ * Segment option properties
+ */
+ private Map<String, String> options;
+
SegmentFile() {
locationMap = new HashMap<>();
}
@@ -1086,6 +1181,13 @@ public class SegmentFileStore {
locationMap.put(path, details);
}
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
}
/**
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index 05f616f..4b79eb6 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -17,16 +17,33 @@
package org.apache.carbondata.core.statusmanager;
+import java.io.Serializable;
+import java.util.Objects;
+
/**
* The data file format supported in carbondata project
*/
-public enum FileFormat {
+public class FileFormat implements Serializable {
+
+ public static final FileFormat COLUMNAR_V3 = new FileFormat("COLUMNAR_V3",
0);
+ public static final FileFormat ROW_V1 = new FileFormat("ROW_V1", 1);
+
+ private String format;
+
+ private int ordinal;
- // carbondata columnar file format, optimized for read
- COLUMNAR_V3,
+ public FileFormat(String format) {
+ this.format = format;
+ }
+
+ public FileFormat(String format, int ordinal) {
+ this.format = format;
+ this.ordinal = ordinal;
+ }
- // carbondata row file format, optimized for write
- ROW_V1;
+ public int ordinal() {
+ return ordinal;
+ }
public static FileFormat getByOrdinal(int ordinal) {
switch (ordinal) {
@@ -38,4 +55,19 @@ public enum FileFormat {
return COLUMNAR_V3;
}
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FileFormat that = (FileFormat) o;
+ return Objects.equals(format, that.format);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(format);
+ }
+
+ @Override public String toString() {
+ return format;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 54a6d0f..17a48c5 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -122,7 +122,12 @@ public class LoadMetadataDetails implements Serializable {
/**
* the file format of this segment
*/
- private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+ private String fileFormat = FileFormat.COLUMNAR_V3.toString();
+
+ /**
+ * Segment path if the segment is added externally.
+ */
+ private String path;
/**
* Segment file name where it has the information of partition information.
@@ -420,11 +425,11 @@ public class LoadMetadataDetails implements Serializable {
}
public FileFormat getFileFormat() {
- return fileFormat;
+ return new FileFormat(fileFormat);
}
public void setFileFormat(FileFormat fileFormat) {
- this.fileFormat = fileFormat;
+ this.fileFormat = fileFormat.toString();
}
public String getSegmentFile() {
@@ -447,4 +452,12 @@ public class LoadMetadataDetails implements Serializable {
public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index bc794f4..601abcd 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -48,7 +48,6 @@ import
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
-import org.apache.carbondata.core.mutate.TupleIdEnum;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -252,49 +251,23 @@ public class SegmentUpdateStatusManager {
* @throws Exception
*/
public String[] getDeleteDeltaFilePath(String blockFilePath, String
segmentId) throws Exception {
- String blockId =
- CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true,
isStandardTable);
- String tupleId;
- if (!isStandardTable) {
- tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
- } else {
- tupleId = CarbonTablePath.getShortBlockId(blockId);
- }
- return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ CarbonFile file = FileFactory.getCarbonFile(blockFilePath);
+ return getDeltaFiles(file, segmentId)
.toArray(new String[0]);
}
/**
* Returns all delta file paths of specified block
*/
- private List<String> getDeltaFiles(String tupleId, String extension)
- throws Exception {
- String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId,
TupleIdEnum.SEGMENT_ID);
- String completeBlockName = CarbonTablePath.addDataPartPrefix(
- CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
- + CarbonCommonConstants.FACT_FILE_EXT);
-
- String blockPath;
- if (!isStandardTable) {
- blockPath = identifier.getTablePath() +
CarbonCommonConstants.FILE_SEPARATOR
- + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId,
TupleIdEnum.PART_ID)
- .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR +
completeBlockName;
- } else {
- String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
- identifier.getTablePath(), segment);
- blockPath =
- carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR +
completeBlockName;
- }
- CarbonFile file = FileFactory.getCarbonFile(blockPath,
FileFactory.getFileType(blockPath));
- if (!file.exists()) {
- throw new Exception("Invalid tuple id " + tupleId);
- }
+ private List<String> getDeltaFiles(CarbonFile file, String segmentId) throws
Exception {
+ String completeBlockName = file.getName();
String blockNameWithoutExtn =
completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
//blockName without timestamp
final String blockNameFromTuple =
blockNameWithoutExtn.substring(0,
blockNameWithoutExtn.lastIndexOf("-"));
- return getDeltaFiles(file, blockNameFromTuple, extension, segment);
+ return getDeltaFiles(file, blockNameFromTuple,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT,
+ segmentId);
}
/**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index cb23e3e..19d142e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2595,7 +2595,7 @@ public final class CarbonUtil {
}
// Get the total size of carbon data and the total size of carbon index
- private static HashMap<String, Long>
getDataSizeAndIndexSize(SegmentFileStore fileStore)
+ public static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore
fileStore)
throws IOException {
long carbonDataSize = 0L;
long carbonIndexSize = 0L;
@@ -2633,15 +2633,25 @@ public final class CarbonUtil {
Set<String> carbonindexFiles = folderDetails.getFiles();
String mergeFileName = folderDetails.getMergeFileName();
if (null != mergeFileName) {
- String mergeIndexPath =
- fileStore.getTablePath() + entry.getKey() +
CarbonCommonConstants.FILE_SEPARATOR
- + mergeFileName;
+ String mergeIndexPath;
+ if (entry.getValue().isRelative()) {
+ mergeIndexPath =
+ fileStore.getTablePath() + entry.getKey() +
CarbonCommonConstants.FILE_SEPARATOR
+ + mergeFileName;
+ } else {
+ mergeIndexPath = entry.getKey() +
CarbonCommonConstants.FILE_SEPARATOR + mergeFileName;
+ }
carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize();
}
for (String indexFile : carbonindexFiles) {
- String indexPath =
- fileStore.getTablePath() + entry.getKey() +
CarbonCommonConstants.FILE_SEPARATOR
- + indexFile;
+ String indexPath;
+ if (entry.getValue().isRelative()) {
+ indexPath =
+ fileStore.getTablePath() + entry.getKey() +
CarbonCommonConstants.FILE_SEPARATOR
+ + indexFile;
+ } else {
+ indexPath = entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR +
indexFile;
+ }
carbonIndexSize += FileFactory.getCarbonFile(indexPath).getSize();
}
}
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index c40b065..31f22a7 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -17,13 +17,16 @@
package org.apache.carbondata.mv.rewrite
import java.io.File
+import java.nio.file.{Files, Paths}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
@@ -1242,6 +1245,117 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table IF EXISTS maintable")
}
+ test("test create datamap with add segment") {
+ sql("drop table if exists fact_table_addseg")
+ sql("drop table if exists fact_table_addseg1")
+ sql(
+ """
+ | CREATE TABLE fact_table_addseg (empname String, designation String,
doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE
fact_table_addseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE fact_table_addseg1 (empname String, designation String,
doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE
fact_table_addseg1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("drop datamap if exists datamap_addseg")
+ sql("create datamap datamap_addseg using 'mv' as select empname,
designation from fact_table_addseg")
+ val df = sql("select empname,designation from fact_table_addseg")
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, "datamap_addseg"))
+ assert(df.collect().length == 90)
+ val table = CarbonEnv.getCarbonTable(None, "fact_table_addseg1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+
+ sql(s"alter table fact_table_addseg add segment options('path'='$newPath',
'format'='carbon')").show()
+ sql("select empname,designation from fact_table_addseg").show()
+ val df1 = sql("select empname,designation from fact_table_addseg")
+ val analyzed1 = df1.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed1, "datamap_addseg"))
+ assert(df1.collect().length == 180)
+ sql(s"drop datamap datamap_addseg")
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ sql("drop table if exists fact_table_addseg")
+ sql("drop table if exists fact_table_addseg1")
+ }
+
+ test("test create datamap with add segment with deffered rebuild") {
+ sql("drop table if exists fact_table_addseg")
+ sql("drop table if exists fact_table_addseg1")
+ sql(
+ """
+ | CREATE TABLE fact_table_addseg (empname String, designation String,
doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE
fact_table_addseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE fact_table_addseg1 (empname String, designation String,
doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE
fact_table_addseg1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("drop datamap if exists datamap_addseg")
+ sql("create datamap datamap_addseg using 'mv' WITH DEFERRED REBUILD as
select empname, designation from fact_table_addseg")
+ sql("rebuild datamap datamap_addseg")
+ val df = sql("select empname,designation from fact_table_addseg")
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, "datamap_addseg"))
+ assert(df.collect().length == 90)
+ val table = CarbonEnv.getCarbonTable(None, "fact_table_addseg1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+
+ sql(s"alter table fact_table_addseg add segment options('path'='$newPath',
'format'='carbon')").show()
+ val df1 = sql("select empname,designation from fact_table_addseg")
+ val analyzed1 = df1.queryExecution.analyzed
+ assert(!TestUtil.verifyMVDataMap(analyzed1, "datamap_addseg"))
+ assert(df1.collect().length == 180)
+
+ sql("rebuild datamap datamap_addseg")
+
+ val df2 = sql("select empname,designation from fact_table_addseg")
+ val analyzed2 = df2.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed2, "datamap_addseg"))
+ assert(df2.collect().length == 180)
+
+ sql(s"drop datamap datamap_addseg")
+ sql("drop table if exists fact_table_addseg")
+ sql("drop table if exists fact_table_addseg1")
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+
+
+ def copy(oldLoc: String, newLoc: String): Unit = {
+ val oldFolder = FileFactory.getCarbonFile(oldLoc)
+ FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
+ val oldFiles = oldFolder.listFiles
+ for (file <- oldFiles) {
+ Files.copy(Paths.get(file.getParentFile.getPath, file.getName),
Paths.get(newLoc, file.getName))
+ }
+ }
+
override def afterAll {
drop()
CarbonProperties.getInstance()
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
new file mode 100644
index 0000000..50927ee
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.addsegment
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ dropTable
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+
+ }
+
+ test("Test add segment ") {
+ sql("drop table if exists addsegment1")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("select count(*) from addsegment1").show()
+ val table = CarbonEnv.getCarbonTable(None, "addsegment1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+ sql("delete from table addsegment1 where segment.id in (1)")
+ sql("clean files for table addsegment1")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='carbon')").show()
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("Test added segment drop") {
+ sql("drop table if exists addsegment1")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("select count(*) from addsegment1").show()
+ val table = CarbonEnv.getCarbonTable(None, "addsegment1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+ sql("delete from table addsegment1 where segment.id in (1)")
+ sql("clean files for table addsegment1")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='carbon')").show()
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ sql("delete from table addsegment1 where segment.id in (2)")
+ sql("clean files for table addsegment1")
+ val oldFolder = FileFactory.getCarbonFile(newPath)
+ assert(oldFolder.listFiles.length == 0, "Added segment path should be
deleted when clean files are called")
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("Test compact on added segment") {
+ sql("drop table if exists addsegment1")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("select count(*) from addsegment1").show()
+ val table = CarbonEnv.getCarbonTable(None, "addsegment1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+ sql("delete from table addsegment1 where segment.id in (1)")
+ sql("clean files for table addsegment1")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='carbon')").show()
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ sql("alter table addsegment1 compact 'major'").show()
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ sql("clean files for table addsegment1")
+ val oldFolder = FileFactory.getCarbonFile(newPath)
+ assert(oldFolder.listFiles.length == 0, "Added segment path should be
deleted when clean files are called")
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("Test compact on multiple added segments") {
+ sql("drop table if exists addsegment1")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("select count(*) from addsegment1").show()
+ val table = CarbonEnv.getCarbonTable(None, "addsegment1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+ val newPath = storeLocation + "/" + "addsegtest"
+ for (i <- 0 until 10) {
+ copy(path, newPath+i)
+ }
+
+ sql("delete from table addsegment1 where segment.id in (1)")
+ sql("clean files for table addsegment1")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+ for (i <- 0 until 10) {
+ sql(s"alter table addsegment1 add segment options('path'='${newPath+i}',
'format'='carbon')").show()
+
+ }
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(110)))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(110)))
+ sql("alter table addsegment1 compact 'minor'").show()
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(110)))
+ sql("clean files for table addsegment1")
+ val oldFolder = FileFactory.getCarbonFile(newPath)
+ assert(oldFolder.listFiles.length == 0, "Added segment path should be
deleted when clean files are called")
+ for (i <- 0 until 10) {
+ FileFactory.deleteAllFilesOfDir(new File(newPath+i))
+ }
+ }
+
+
+ test("Test update on added segment") {
+ sql("drop table if exists addsegment1")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("select count(*) from addsegment1").show()
+ val table = CarbonEnv.getCarbonTable(None, "addsegment1")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+ sql("delete from table addsegment1 where segment.id in (1)")
+ sql("clean files for table addsegment1")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='carbon')").show()
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ sql("""update addsegment1 d set (d.empname) = ('ravi') where d.empname =
'arvind'""").show()
+ checkAnswer(sql("select count(*) from addsegment1 where empname='ravi'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("Test validation on added segment") {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql("select count(*) from addsegment1").show()
+ val table = CarbonEnv.getCarbonTable(None, "addsegment2")
(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+ val newPath = storeLocation + "/" + "addsegtest"
+ copy(path, newPath)
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+ val ex = intercept[Exception] {
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='carbon')").show()
+ }
+ assert(ex.getMessage.contains("Schema is not same"))
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ def copy(oldLoc: String, newLoc: String): Unit = {
+ val oldFolder = FileFactory.getCarbonFile(oldLoc)
+ FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
+ val oldFiles = oldFolder.listFiles
+ for (file <- oldFiles) {
+ Files.copy(Paths.get(file.getParentFile.getPath, file.getName),
Paths.get(newLoc, file.getName))
+ }
+ }
+
+
+ override def afterAll = {
+ dropTable
+ }
+
+ def dropTable = {
+ sql("drop table if exists addsegment1")
+ sql("drop table if exists addsegment2")
+ }
+
+}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 0544d22..f348da7 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -98,7 +98,7 @@ object CarbonStore {
new java.sql.Timestamp(load.getLoadEndTime).toString
}
- val (dataSize, indexSize) = if (load.getFileFormat ==
FileFormat.ROW_V1) {
+ val (dataSize, indexSize) = if
(load.getFileFormat.equals(FileFormat.ROW_V1)) {
// for streaming segment, we should get the actual size from the
index file
// since it is continuously inserting data
val segmentDir = CarbonTablePath.getSegmentPath(tablePath,
load.getLoadName)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 73aba46..cc7ba5a 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -146,7 +146,7 @@ class CarbonScanRDD[T: ClassTag](
val streamSplits = new ArrayBuffer[InputSplit]()
splits.asScala.foreach { split =>
val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
- if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
+ if (FileFormat.ROW_V1.equals(carbonInputSplit.getFileFormat)) {
streamSplits += split
} else {
columnarSplits.add(split)
@@ -428,8 +428,9 @@ class CarbonScanRDD[T: ClassTag](
// one query id per table
model.setQueryId(queryId)
// get RecordReader by FileFormat
- var reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
- case FileFormat.ROW_V1 =>
+
+ var reader: RecordReader[Void, Object] =
+ if (inputSplit.getFileFormat.equals(FileFormat.ROW_V1)) {
// create record reader for row format
DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
val inputFormat = new CarbonStreamInputFormat
@@ -441,7 +442,7 @@ class CarbonScanRDD[T: ClassTag](
val streamReader = inputFormat.createRecordReader(inputSplit,
attemptContext)
.asInstanceOf[RecordReader[Void, Object]]
streamReader
- case _ =>
+ } else {
// create record reader for CarbonData file format
if (vectorReader) {
model.setDirectVectorFill(directFill)
@@ -461,7 +462,7 @@ class CarbonScanRDD[T: ClassTag](
format.getReadSupportClass(attemptContext.getConfiguration),
inputMetricsStats, attemptContext.getConfiguration)
}
- }
+ }
val closeReader = () => {
if (reader != null) {
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index c331532..90dfc7c 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -191,6 +191,7 @@ abstract class CarbonDDLSqlParser extends
AbstractCarbonSparkSQLParser {
protected val STREAMS = carbonKeyWord("STREAMS")
protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES")
protected val CARBONCLI = carbonKeyWord("CARBONCLI")
+ protected val PATH = carbonKeyWord("PATH")
protected val doubleQuotedString = "\"([^\"]+)\"".r
protected val singleQuotedString = "'([^']+)'".r
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 74c9140..b289045 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -618,7 +618,7 @@ object CarbonDataRDDFactory {
/**
* clear datamap files for segment
*/
- private def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String):
Unit = {
+ def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String): Unit = {
try {
val segments = List(new Segment(segmentId)).asJava
DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
@@ -646,14 +646,16 @@ object CarbonDataRDDFactory {
SegmentStatusManager.readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
val segmentFiles = segmentDetails.asScala.map { seg =>
- val segmentFile =
-
metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get.getSegmentFile
+ val load =
+ metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get
+ val segmentFile = load.getSegmentFile
var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
val file = SegmentFileStore.writeSegmentFile(
carbonTable,
seg.getSegmentNo,
- String.valueOf(System.currentTimeMillis()))
+ String.valueOf(System.currentTimeMillis()),
+ load.getPath)
if (segmentFile != null) {
segmentFiles ++= FileFactory.getCarbonFile(
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
new file mode 100644
index 0000000..6bc122c
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.File
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.types.StructType
+
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{FileFormat,
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent,
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
+import
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent,
LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.clearDataMapFiles
+
+
+/**
+ * User can add external data folder as a segment to a transactional table.
+ * In case of external carbon data folder user no need to specify the format
in options. But for
+ * other formats like parquet user must specify the format=parquet in options.
+ */
+case class CarbonAddLoadCommand(
+ databaseNameOp: Option[String],
+ tableName: String,
+ options: Map[String, String])
+ extends MetadataCommand {
+
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ val relation = CarbonEnv
+ .getInstance(sparkSession)
+ .carbonMetaStore
+ .lookupRelation(databaseNameOp, tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ val tableSchema = StructType.fromAttributes(relation.output)
+ val carbonTable = relation.carbonTable
+ setAuditTable(carbonTable)
+ if (!carbonTable.getTableInfo.isTransactionalTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
+ }
+
+ // if insert overwrite in progress, do not allow add segment
+ if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+ throw new ConcurrentOperationException(carbonTable, "insert overwrite",
"delete segment")
+ }
+ val segmentPath = options.getOrElse(
+ "path", throw new UnsupportedOperationException("PATH is manadatory"))
+
+ // TODO use the fileformat based on the format.
+ val segSchema = new SparkCarbonFileFormat().inferSchema(sparkSession,
options, Seq.empty).get
+
+ if (!tableSchema.equals(segSchema)) {
+ throw new AnalysisException(s"Schema is not same. Table schema is : " +
+ s"${tableSchema} and segment schema is :
${segSchema}")
+ }
+
+ val model = new CarbonLoadModel
+ model.setCarbonTransactionalTable(true)
+ model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+ model.setDatabaseName(carbonTable.getDatabaseName)
+ model.setTableName(carbonTable.getTableName)
+ val operationContext = new OperationContext
+ val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+ new LoadTablePreExecutionEvent(
+ carbonTable.getCarbonTableIdentifier,
+ model)
+ operationContext.setProperty("isOverwrite", false)
+ OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent,
operationContext)
+ // Add pre event listener for index datamap
+ val tableDataMaps =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
+ val dataMapOperationContext = new OperationContext()
+ if (tableDataMaps.size() > 0) {
+ val dataMapNames: mutable.Buffer[String] =
+ tableDataMaps.asScala.map(dataMap =>
dataMap.getDataMapSchema.getDataMapName)
+ val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+ new BuildDataMapPreExecutionEvent(sparkSession,
+ carbonTable.getAbsoluteTableIdentifier, dataMapNames)
+
OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
+ dataMapOperationContext)
+ }
+
+ val newLoadMetaEntry = new LoadMetadataDetails
+ model.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime)
+ CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
+ SegmentStatus.INSERT_IN_PROGRESS,
+ model.getFactTimeStamp,
+ false)
+ newLoadMetaEntry.setPath(segmentPath)
+ val format = options.getOrElse("format", "carbondata")
+ if (!(format.equals("carbondata") || format.equals("carbon"))) {
+ newLoadMetaEntry.setFileFormat(new FileFormat(format))
+ }
+
+ CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true,
false)
+ val segment = new Segment(model.getSegmentId,
+ SegmentFileStore.genSegmentFileName(
+ model.getSegmentId,
+ System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
+ segmentPath,
+ new util.HashMap[String, String](options.asJava))
+ val writeSegment =
+ SegmentFileStore.writeSegmentFile(carbonTable, segment)
+
+ operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
+ model.getSegmentId)
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ new LoadTablePreStatusUpdateEvent(
+ carbonTable.getCarbonTableIdentifier,
+ model)
+
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent,
operationContext)
+
+ val success = if (writeSegment) {
+ SegmentFileStore.updateSegmentFile(
+ carbonTable,
+ model.getSegmentId,
+ segment.getSegmentFileName,
+ carbonTable.getCarbonTableIdentifier.getTableId,
+ new SegmentFileStore(carbonTable.getTablePath,
segment.getSegmentFileName),
+ SegmentStatus.SUCCESS)
+ } else {
+ false
+ }
+
+ val postExecutionEvent = if (success) {
+ val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+ new LoadTablePostStatusUpdateEvent(model)
+ val commitComplete = try {
+ OperationListenerBus.getInstance()
+ .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+ true
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Problem while committing data maps", ex)
+ false
+ }
+ commitComplete
+ } else {
+ success
+ }
+
+ if (!postExecutionEvent || !success) {
+ CarbonLoaderUtil.updateTableStatusForFailure(model,
"uniqueTableStatusId")
+ LOGGER.info("********starting clean up**********")
+ // delete segment is applicable for transactional table
+ CarbonLoaderUtil.deleteSegment(model, model.getSegmentId.toInt)
+ // delete corresponding segment file from metadata
+ val segmentFile =
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
+ File.separator + segment.getSegmentFileName
+ FileFactory.deleteFile(segmentFile, FileFactory.getFileType(segmentFile))
+ clearDataMapFiles(carbonTable, model.getSegmentId)
+ LOGGER.info("********clean up done**********")
+ LOGGER.error("Data load failed due to failure in table status updation.")
+ throw new Exception("Data load failed due to failure in table status
updation.")
+ }
+ DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
+ val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+ new LoadTablePostExecutionEvent(
+ carbonTable.getCarbonTableIdentifier,
+ model)
+ OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent,
operationContext)
+ if (tableDataMaps.size() > 0) {
+ val buildDataMapPostExecutionEvent =
BuildDataMapPostExecutionEvent(sparkSession,
+ carbonTable.getAbsoluteTableIdentifier, null, Seq(model.getSegmentId),
false)
+ OperationListenerBus.getInstance()
+ .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+ }
+ Seq.empty
+ }
+
+ override protected def opName: String = "ADD SEGMENT WITH PATH"
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index cb86cb5..08b0dd0 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -21,6 +21,7 @@ import java.util
import scala.collection.JavaConverters._
+import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
@@ -40,7 +41,7 @@ import
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil,
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
@@ -130,14 +131,14 @@ object DeleteExecution {
var result = List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long))]()
while (records.hasNext) {
val ((key), (rowCountDetailsVO, groupedRows)) = records.next
- val segmentId = key.substring(0,
key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
result = result ++
deleteDeltaFunc(index,
key,
groupedRows.toIterator,
timestamp,
rowCountDetailsVO,
- isStandardTable)
+ isStandardTable,
+ metadataDetails)
}
result
}).collect()
@@ -217,7 +218,8 @@ object DeleteExecution {
iter: Iterator[Row],
timestamp: String,
rowCountDetailsVO: RowCountDetailsVO,
- isStandardTable: Boolean
+ isStandardTable: Boolean,
+ loads: Array[LoadMetadataDetails]
): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))] = {
val result = new DeleteDelataResultImpl()
@@ -228,6 +230,7 @@ object DeleteExecution {
.getBlockName(
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
+ val load = loads.find(l => l.getLoadName.equalsIgnoreCase(segmentId)).get
val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new
DeleteDeltaBlockDetails(blockName)
val resultIter =
new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))] {
@@ -255,7 +258,11 @@ object DeleteExecution {
}
val blockPath =
- CarbonUpdateUtil.getTableBlockPath(TID, tablePath, isStandardTable)
+ if (StringUtils.isNotEmpty(load.getPath)) {
+ load.getPath
+ } else {
+ CarbonUpdateUtil.getTableBlockPath(TID, tablePath,
isStandardTable)
+ }
val completeBlockName = CarbonTablePath
.addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.BLOCK_ID) +
CarbonCommonConstants.FACT_FILE_EXT)
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 22548ff..8670b13 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -81,7 +81,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
cacheManagement | alterDataMap
protected lazy val loadManagement: Parser[LogicalPlan] =
- deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+ deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew |
addLoad
protected lazy val restructure: Parser[LogicalPlan] =
alterTableColumnRenameAndModifyDataType | alterTableDropColumn |
alterTableAddColumns
@@ -482,6 +482,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
}
+ /**
+ * ALTER TABLE <db.tableName> ADD SEGMENT
OPTIONS('path'='path','''key'='value')
+ */
+ protected lazy val addLoad: Parser[LogicalPlan] =
+ ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~
+ (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")") <~ opt(";") ^^ {
+ case dbName ~ tableName ~ segment ~ optionsList =>
+ CarbonAddLoadCommand(dbName, tableName, optionsList.toMap)
+ }
+
protected lazy val cleanFiles: Parser[LogicalPlan] =
CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
case databaseName ~ tableName =>
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 8cdb6af..408c961 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -46,6 +46,8 @@ public class CarbonDataLoadConfiguration {
private BucketingInfo bucketingInfo;
+ private String segmentPath;
+
private Map<String, Object> dataLoadProperties = new HashMap<>();
/**
@@ -444,4 +446,12 @@ public class CarbonDataLoadConfiguration {
public void setNumberOfLoadingCores(int numberOfLoadingCores) {
this.numberOfLoadingCores = numberOfLoadingCores;
}
+
+ public String getSegmentPath() {
+ return segmentPath;
+ }
+
+ public void setSegmentPath(String segmentPath) {
+ this.segmentPath = segmentPath;
+ }
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 2cb3895..520943e 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -34,6 +34,7 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.CarbonProperties;
import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
@@ -220,6 +221,16 @@ public final class DataLoadProcessBuilder {
configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
configuration.setHeader(loadModel.getCsvHeaderColumns());
configuration.setSegmentId(loadModel.getSegmentId());
+ List<LoadMetadataDetails> loadMetadataDetails =
loadModel.getLoadMetadataDetails();
+ if (loadMetadataDetails != null) {
+ for (LoadMetadataDetails detail : loadMetadataDetails) {
+ if (detail.getLoadName().equals(loadModel.getSegmentId()) &&
StringUtils
+ .isNotEmpty(detail.getPath())) {
+ configuration.setSegmentPath(detail.getPath());
+ }
+ }
+ }
+
configuration.setTaskNo(loadModel.getTaskNo());
String[] complexDelimiters = new
String[loadModel.getComplexDelimiters().size()];
loadModel.getComplexDelimiters().toArray(complexDelimiters);
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 39a83d9..28c5fac 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -493,9 +493,13 @@ public class CarbonFactDataHandlerModel {
if (!configuration.isCarbonTransactionalTable()) {
carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath();
} else {
- carbonDataDirectoryPath = CarbonTablePath
- .getSegmentPath(absoluteTableIdentifier.getTablePath(),
- configuration.getSegmentId() + "");
+ if (configuration.getSegmentPath() != null) {
+ carbonDataDirectoryPath = configuration.getSegmentPath();
+ } else {
+ carbonDataDirectoryPath = CarbonTablePath
+ .getSegmentPath(absoluteTableIdentifier.getTablePath(),
+ configuration.getSegmentId() + "");
+ }
}
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
diff --git
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index ba3d64a..8c3fd14 100644
---
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -86,7 +86,7 @@ public class StreamSegment {
CarbonTablePath.getMetadataPath(table.getTablePath()));
LoadMetadataDetails streamSegment = null;
for (LoadMetadataDetails detail : details) {
- if (FileFormat.ROW_V1 == detail.getFileFormat()) {
+ if (FileFormat.ROW_V1.equals(detail.getFileFormat())) {
if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
streamSegment = detail;
break;