This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 9aa5973 Fixed issues for Add Segment
9aa5973 is described below
commit 9aa597352aae2dd90fd8fc14faec3f2f73c9bf57
Author: manishnalla1994 <[email protected]>
AuthorDate: Wed Oct 30 11:23:01 2019 +0530
Fixed issues for Add Segment
Issue1 : When the format is given in uppercase, add segment fails with
unknown format.
Solution1 : Made format case-insensitive.
Issue2 : The same path is being added repeatedly, blocked this
operation.
Issue3 : Added validation for the folder not containing carbon files.
This closes #3426
---
.../core/datamap/DistributableDataMapFormat.java | 1 +
.../carbondata/core/metadata/SegmentFileStore.java | 24 +++++++++++++++-------
.../carbondata/core/statusmanager/FileFormat.java | 4 ++--
.../testsuite/addsegment/AddSegmentTestCase.scala | 5 ++---
.../command/management/CarbonAddLoadCommand.scala | 20 ++++++++++++++++--
.../execution/strategy/MixedFormatHandler.scala | 2 +-
.../carbondata/TestStreamingTableOpName.scala | 6 +++---
.../TestStreamingTableWithRowParser.scala | 6 +++---
8 files changed, 47 insertions(+), 21 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 5af68b9..a4d02cb 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -225,6 +225,7 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
if (partitions == null) {
out.writeBoolean(false);
} else {
+ out.writeBoolean(true);
out.writeInt(partitions.size());
for (PartitionSpec partitionSpec : partitions) {
partitionSpec.write(out);
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 511687e..b03dbf4 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -184,6 +184,22 @@ public class SegmentFileStore {
return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath);
}
+ /**
+ * Returns the list of index files
+ *
+ * @param segmentPath
+ * @return
+ */
+ public static CarbonFile[] getListOfCarbonIndexFiles(String segmentPath) {
+ CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
+ CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+ file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+ }
+ });
+ return indexFiles;
+ }
/**
* Write segment file to the metadata folder of the table.
@@ -195,13 +211,7 @@ public class SegmentFileStore {
public static boolean writeSegmentFile(CarbonTable carbonTable, Segment
segment)
throws IOException {
String tablePath = carbonTable.getTablePath();
- CarbonFile segmentFolder =
FileFactory.getCarbonFile(segment.getSegmentPath());
- CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
file.getName()
- .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
- }
- });
+ CarbonFile[] indexFiles =
getListOfCarbonIndexFiles(segment.getSegmentPath());
if (indexFiles != null && indexFiles.length > 0) {
SegmentFile segmentFile = new SegmentFile();
segmentFile.setOptions(segment.getOptions());
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index 4b79eb6..6f098e2 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -33,11 +33,11 @@ public class FileFormat implements Serializable {
private int ordinal;
public FileFormat(String format) {
- this.format = format;
+ this.format = format.toLowerCase();
}
public FileFormat(String format, int ordinal) {
- this.format = format;
+ this.format = format.toLowerCase();
this.ordinal = ordinal;
}
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index e39272a..8da73e2 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.row.CarbonRow
@@ -31,9 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.junit.Assert
-
import scala.io.Source
class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
@@ -542,7 +541,7 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
- sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='PARQUET')").show()
checkExistence(sql(s"show segments for table addsegment1"), true,
"spark-common/target/warehouse/addsegtest")
checkExistence(sql(s"show history segments for table addsegment1"), true,
"spark-common/target/warehouse/addsegtest")
FileFactory.deleteAllFilesOfDir(new File(newPath))
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 34d22a7..7b2c088 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -92,6 +92,24 @@ case class CarbonAddLoadCommand(
val segmentPath = options.getOrElse(
"path", throw new UnsupportedOperationException("PATH is manadatory"))
+ val allSegments =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+
+ // If a path is already added then we should block the adding of the same
path again.
+ if (allSegments.exists(a =>
+ a.getPath != null && a.getPath.equalsIgnoreCase(segmentPath)
+ )) {
+ throw new AnalysisException(s"path already exists in table status file,
can not add same " +
+ s"segment path repeatedly: $segmentPath")
+ }
+
+ val format = options.getOrElse("format", "carbondata")
+ val isCarbonFormat = format.equalsIgnoreCase("carbondata") ||
format.equalsIgnoreCase("carbon")
+
+ // If in the given location no carbon index files are found then we should
throw an exception
+ if (isCarbonFormat &&
SegmentFileStore.getListOfCarbonIndexFiles(segmentPath).isEmpty) {
+ throw new AnalysisException("CarbonIndex files not present in the
location")
+ }
+
val segSchema = MixedFormatHandler.getSchema(sparkSession, options,
segmentPath)
val segCarbonSchema = new Schema(segSchema.fields.map { field =>
@@ -142,8 +160,6 @@ case class CarbonAddLoadCommand(
model.getFactTimeStamp,
false)
newLoadMetaEntry.setPath(segmentPath)
- val format = options.getOrElse("format", "carbondata")
- val isCarbonFormat = format.equals("carbondata") || format.equals("carbon")
if (!isCarbonFormat) {
newLoadMetaEntry.setFileFormat(new FileFormat(format))
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index a54645d..26c0fb0 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -60,7 +60,7 @@ object MixedFormatHandler {
options: Map[String, String],
segPath: String): StructType = {
val format = options.getOrElse("format", "carbondata")
- if ((format.equals("carbondata") || format.equals("carbon"))) {
+ if ((format.equalsIgnoreCase("carbondata") ||
format.equalsIgnoreCase("carbon"))) {
new SparkCarbonFileFormat().inferSchema(sparkSession, options,
Seq.empty).get
} else {
val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\",
"/"))
diff --git
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index f57c593..ade3567 100644
---
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -1061,13 +1061,13 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
result1.foreach { row =>
if (row.getString(0).equals("1")) {
assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
- assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+ assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase)
} else if (row.getString(0).equals("0.1")) {
assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
- assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
} else {
assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
- assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
}
}
diff --git
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 71d94b7..1672e75 100644
---
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -397,13 +397,13 @@ class TestStreamingTableWithRowParser extends QueryTest
with BeforeAndAfterAll {
result1.foreach { row =>
if (row.getString(0).equals("1")) {
assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
- assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+ assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase)
} else if (row.getString(0).equals("0.1")) {
assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
- assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
} else {
assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
- assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
}
}