This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 35c4b33 [CARBONDATA-4133] Concurrent Insert Overwrite with static
partition on Index server fails
35c4b33 is described below
commit 35c4b33e4ac6722356756aa0a100dee0724094c6
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Feb 23 20:45:46 2021 +0530
[CARBONDATA-4133] Concurrent Insert Overwrite with static partition on
Index server fails
Why is this PR needed?
Concurrent Insert Overwrite with static partition on Index server fails.
When index server
and prepriming are enabled, prepriming is triggered even when load fails as
it is in finally block.
Performance degradation with indexserver due to #4080
What changes were proposed in this PR?
Removed triggerPrepriming method from finally.
Reverted 4080 and used a boolean flag to determine the external segment.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No, tested in cluster.
This closes #4096
---
.../org/apache/carbondata/core/index/Segment.java | 26 +++++++++++---------
.../apache/carbondata/core/index/TableIndex.java | 2 +-
.../core/indexstore/ExtendedBlocklet.java | 28 +++++++---------------
.../command/management/CommonLoadUtils.scala | 19 +++++++--------
4 files changed, 33 insertions(+), 42 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
index 6129373..0a32984 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
@@ -79,7 +79,12 @@ public class Segment implements Serializable, Writable {
/**
* Path of segment where it exists
*/
- private String segmentPath;
+ private transient String segmentPath;
+
+ /**
+ * To identify if it is an externally added segment or not.
+ */
+ private boolean isExternalSegment = false;
/**
* Properties of the segment.
@@ -162,7 +167,9 @@ public class Segment implements Serializable, Writable {
this.segmentFileName = segmentFileName;
this.readCommittedScope = readCommittedScope;
this.loadMetadataDetails = loadMetadataDetails;
- this.segmentPath = loadMetadataDetails.getPath();
+ if (loadMetadataDetails.getPath() != null) {
+ this.isExternalSegment = true;
+ }
if (loadMetadataDetails.getIndexSize() != null) {
this.indexSize = Long.parseLong(loadMetadataDetails.getIndexSize());
}
@@ -378,12 +385,7 @@ public class Segment implements Serializable, Writable {
out.writeUTF(segmentString);
}
out.writeLong(indexSize);
- if (segmentPath == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeUTF(segmentPath);
- }
+ out.writeBoolean(isExternalSegment);
}
@Override
@@ -401,9 +403,7 @@ public class Segment implements Serializable, Writable {
this.segmentString = in.readUTF();
}
this.indexSize = in.readLong();
- if (in.readBoolean()) {
- this.segmentPath = in.readUTF();
- }
+ this.isExternalSegment = in.readBoolean();
}
public SegmentMetaDataInfo getSegmentMetaDataInfo() {
@@ -413,4 +413,8 @@ public class Segment implements Serializable, Writable {
public void setSegmentMetaDataInfo(SegmentMetaDataInfo segmentMetaDataInfo) {
this.segmentMetaDataInfo = segmentMetaDataInfo;
}
+
+ public boolean isExternalSegment() {
+ return isExternalSegment;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index ddf91ec..c85f64d 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -210,7 +210,7 @@ public final class TableIndex extends
OperationEventListener {
indexes.get(segment) == null || indexes.get(segment).isEmpty()) {
continue;
}
- boolean isExternalSegment = segment.getSegmentPath() != null;
+ boolean isExternalSegment = segment.isExternalSegment();
List<Blocklet> pruneBlocklets = new ArrayList<>();
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentProperties(segment,
partitionLocations);
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 900d7c2..dac4d52 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -20,13 +20,10 @@ package org.apache.carbondata.core.indexstore;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import
org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexRowIndexes;
import org.apache.carbondata.core.indexstore.row.IndexRow;
@@ -200,6 +197,7 @@ public class ExtendedBlocklet extends Blocklet {
inputSplit.setWriteDetailInfo(false);
}
inputSplit.serializeFields(dos, uniqueLocation);
+ out.writeBoolean(inputSplit.getSegment().isExternalSegment());
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0, ebos.size());
}
@@ -225,25 +223,15 @@ public class ExtendedBlocklet extends Blocklet {
if (in.readBoolean()) {
indexUniqueId = in.readUTF();
}
- String filePath = getPath();
- boolean isLocalFile = FileFactory.getCarbonFile(filePath) instanceof
LocalCarbonFile;
-
- // For external segment, table path need not be appended to filePath as
contains has full path
- // Example filepath for ext segment:
- // 1. /home/user/carbondata/integration/spark/newsegmentpath
- // 2. hdfs://hacluster/opt/newsegmentpath/
- // Example filepath for loaded segment: /Fact/Part/Segment0
- // To identify a filePath as ext segment path,
- // for other storage systems (hdfs,s3): filePath doesn't start with File
separator.
- // for ubuntu storage: it starts with File separator, so check if given
path exists or not.
- if ((!isLocalFile && filePath.startsWith(File.separator)) || (isLocalFile
&& !FileFactory
- .isFileExist(filePath))) {
- setFilePath(tablePath + filePath);
- } else {
- setFilePath(filePath);
- }
boolean isSplitPresent = in.readBoolean();
if (isSplitPresent) {
+ String filePath = getPath();
+ boolean isExternalSegment = in.readBoolean();
+ if (!isExternalSegment) {
+ setFilePath(tablePath + filePath);
+ } else {
+ setFilePath(filePath);
+ }
// getting the length of the data
final int serializeLen = in.readInt();
this.inputSplit =
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 404cbde..3c2e81e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1077,16 +1077,15 @@ object CommonLoadUtils {
case _ =>
}
}
-
- // Pre-priming for Partition table here
- if (!StringUtils.isEmpty(loadParams.carbonLoadModel.getSegmentId)) {
- DistributedRDDUtils.triggerPrepriming(loadParams.sparkSession,
- table,
- Seq(),
- loadParams.operationContext,
- loadParams.hadoopConf,
- List(loadParams.carbonLoadModel.getSegmentId))
- }
+ }
+ // Pre-priming for Partition table here
+ if (!StringUtils.isEmpty(loadParams.carbonLoadModel.getSegmentId)) {
+ DistributedRDDUtils.triggerPrepriming(loadParams.sparkSession,
+ table,
+ Seq(),
+ loadParams.operationContext,
+ loadParams.hadoopConf,
+ List(loadParams.carbonLoadModel.getSegmentId))
}
try {
val compactedSegments = new util.ArrayList[String]()