This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 25c5687 [CARBONDATA-4141] Index Server is not caching indexes for
external tables with sdk segments
25c5687 is described below
commit 25c5687e2c8514f54aae4617410d3909ad8840e8
Author: Karan980 <[email protected]>
AuthorDate: Tue Mar 2 17:13:49 2021 +0530
[CARBONDATA-4141] Index Server is not caching indexes for external tables
with sdk segments
Why is this PR needed?
Indexes cached in Executor cache are not dropped when drop table is called
for external table
with SDK segments. Because, external tables with sdk segments will not have
metadata like table
status file. So in drop table command we send zero segments to indexServer
clearIndexes job,
which clears nothing from executor side. So when we drop this type of
table, executor side
indexes are not dropped. Now when we again create external table with same
location and do
select * or select count(*), it will not cache the indexes for this table,
because indexes with
same loaction are already present. Now show metacache on this newly created
table will use new tableId ,
but indexes present have the old tableId, whose table is already dropped.
So show metacache will return
nothing, because of tableId mismatch.
What changes were proposed in this PR?
Prepared the validSegments from indexFiles present at external table
location and send it to IndexServer clearIndexes job through IndexInputFormat.
This closes #4099
---
.../apache/carbondata/core/index/IndexUtil.java | 36 +++++++++++++++++-----
.../blockletindex/BlockletIndexFactory.java | 8 +++--
2 files changed, 34 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index c663b78..87d2a40 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.index;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -36,7 +37,10 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.BlockletIndexUtil;
import org.apache.carbondata.core.util.CarbonProperties;
@@ -112,15 +116,31 @@ public class IndexUtil {
*/
private static void executeClearIndexJob(IndexJob indexJob,
CarbonTable carbonTable, String indexToClear) throws IOException {
- SegmentStatusManager.ValidAndInvalidSegmentsInfo
validAndInvalidSegmentsInfo =
- getValidAndInvalidSegments(carbonTable,
FileFactory.getConfiguration());
- List<String> invalidSegment = new ArrayList<>();
- for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) {
- invalidSegment.add(segment.getSegmentNo());
+ IndexInputFormat indexInputFormat;
+ if (!carbonTable.isTransactionalTable()) {
+ ReadCommittedScope readCommittedScope =
+ new LatestFilesReadCommittedScope(carbonTable.getTablePath(),
+ FileFactory.getConfiguration());
+ LoadMetadataDetails[] loadMetadataDetails =
readCommittedScope.getSegmentList();
+ List<Segment> listOfValidSegments = new
ArrayList<>(loadMetadataDetails.length);
+ Arrays.stream(loadMetadataDetails).forEach(segment -> {
+ Segment seg = new Segment(segment.getLoadName(),
segment.getSegmentFile());
+ seg.setLoadMetadataDetails(segment);
+ listOfValidSegments.add(seg);
+ });
+ indexInputFormat =
+ new IndexInputFormat(carbonTable, listOfValidSegments, new
ArrayList<>(0), true,
+ indexToClear);
+ } else {
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo
validAndInvalidSegmentsInfo =
+ getValidAndInvalidSegments(carbonTable,
FileFactory.getConfiguration());
+ List<String> invalidSegment = new ArrayList<>();
+ validAndInvalidSegmentsInfo.getInvalidSegments()
+ .forEach(segment -> invalidSegment.add(segment.getSegmentNo()));
+ indexInputFormat =
+ new IndexInputFormat(carbonTable,
validAndInvalidSegmentsInfo.getValidSegments(),
+ invalidSegment, true, indexToClear);
}
- IndexInputFormat indexInputFormat =
- new IndexInputFormat(carbonTable,
validAndInvalidSegmentsInfo.getValidSegments(),
- invalidSegment, true, indexToClear);
try {
indexJob.execute(indexInputFormat, null);
} catch (Exception e) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
index 7989b99..27e1090 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
@@ -452,8 +452,12 @@ public class BlockletIndexFactory extends
CoarseGrainIndexFactory
BlockletIndexInputSplit distributable = new BlockletIndexInputSplit();
distributable.setSegment(segment);
distributable.setIndexSchema(INDEX_SCHEMA);
-
distributable.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath(),
- segment.getSegmentNo()));
+ if (!(getCarbonTable().isTransactionalTable())) {
+ distributable.setSegmentPath(identifier.getTablePath());
+ } else {
+ distributable.setSegmentPath(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(),
segment.getSegmentNo()));
+ }
distributableList.add(new
IndexInputSplitWrapper(UUID.randomUUID().toString(),
distributable).getDistributable());
} catch (Exception e) {