This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new f215eaa [CARBONDATA-3759] Refactor segmentRefreshInfo and fix cache
issue in multiple application scenario
f215eaa is described below
commit f215eaad3b80f14032314f27c0a83f0ea77696a1
Author: akashrn5 <[email protected]>
AuthorDate: Sat Mar 28 17:21:30 2020 +0530
[CARBONDATA-3759] Refactor segmentRefreshInfo and fix cache issue in
multiple application scenario
Why is this PR needed?
currently the segmentRefreshInfo is helping to clear the cache only in
update cases and it fails to refresh the cache if any segment files changes or
get updates.
when two applications are running on same store. One application changes
some segment files changes and removes old cache and may be delete files, which
wont be reflected in other application, which may result in either wrong
results or query failure.
What changes were proposed in this PR?
refactor the segmentRefreshInfo to clear the cache and update when there
are any updates on segments and if the segment files of respective segments
changes.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
Tested in cluster and existing test cases will be enough.
This closes #3686
---
.../core/datamap/DataMapStoreManager.java | 59 +++++++++++++++-------
.../LatestFilesReadCommittedScope.java | 2 +-
.../TableStatusReadCommittedScope.java | 12 ++++-
.../core/statusmanager/SegmentRefreshInfo.java | 28 +++++-----
.../statusmanager/SegmentUpdateStatusManager.java | 25 ++++-----
.../hadoop/api/CarbonTableInputFormat.java | 16 +++---
.../indexserver/DistributedRDDUtils.scala | 2 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 18 ++++---
.../joins/BroadCastSIFilterPushJoin.scala | 2 +-
9 files changed, 98 insertions(+), 66 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 65ddc5c..cbfa071 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.IndexFactory;
import
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import
org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory;
@@ -44,13 +45,14 @@ import
org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
import
org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
-import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import static
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
@@ -502,11 +504,12 @@ public final class DataMapStoreManager {
}
public List<String> getSegmentsToBeRefreshed(CarbonTable carbonTable,
- SegmentUpdateStatusManager updateStatusManager, List<Segment>
filteredSegmentToAccess) {
+ List<Segment> filteredSegmentToAccess) {
List<String> toBeCleanedSegments = new ArrayList<>();
for (Segment filteredSegment : filteredSegmentToAccess) {
boolean refreshNeeded =
getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment,
-
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
+ SegmentUpdateStatusManager
+
.getInvalidTimestampRange(filteredSegment.getLoadMetadataDetails()));
if (refreshNeeded) {
toBeCleanedSegments.add(filteredSegment.getSegmentNo());
}
@@ -518,7 +521,7 @@ public final class DataMapStoreManager {
SegmentUpdateStatusManager updateStatusManager, List<Segment>
filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments =
- getSegmentsToBeRefreshed(carbonTable, updateStatusManager,
filteredSegmentToAccess);
+ getSegmentsToBeRefreshed(carbonTable, filteredSegmentToAccess);
if (toBeCleanedSegments.size() > 0) {
clearInvalidSegments(carbonTable, toBeCleanedSegments);
}
@@ -739,17 +742,30 @@ public final class DataMapStoreManager {
private Map<String, Boolean> manualSegmentRefresh = new HashMap<>();
TableSegmentRefresher(CarbonTable table) {
- SegmentUpdateStatusManager statusManager = new
SegmentUpdateStatusManager(table);
- SegmentUpdateDetails[] updateStatusDetails =
statusManager.getUpdateStatusDetails();
- for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
- UpdateVO updateVO =
statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
+ SegmentStatusManager segmentStatusManager =
+ new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+ List<Segment> validSegments;
+ try {
+ validSegments =
segmentStatusManager.getValidAndInvalidSegments().getValidSegments();
+ } catch (IOException e) {
+ LOGGER.error("Error while getting the valid segments.", e);
+ throw new RuntimeException(e);
+ }
+ for (Segment segment : validSegments) {
+ UpdateVO updateVO =
+
SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails());
SegmentRefreshInfo segmentRefreshInfo;
- if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
- segmentRefreshInfo = new
SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0);
+ if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null
+ || segment.getSegmentFileName() != null) {
+ long segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
+ .getSegmentFilePath(table.getTablePath(),
segment.getSegmentFileName()))
+ .getLastModifiedTime();
+ segmentRefreshInfo =
+ new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0,
segmentFileTimeStamp);
} else {
- segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+ segmentRefreshInfo = new SegmentRefreshInfo(0L, 0, 0L);
}
- segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo);
+ segmentRefreshTime.put(segment.getSegmentNo(), segmentRefreshInfo);
}
}
@@ -757,14 +773,23 @@ public final class DataMapStoreManager {
SegmentRefreshInfo segmentRefreshInfo =
seg.getSegmentRefreshInfo(updateVo);
String segmentId = seg.getSegmentNo();
- if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null) {
+ if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null
+ && segmentRefreshInfo.getSegmentFileTimestamp() == 0) {
return false;
}
- if (segmentRefreshTime.get(segmentId) == null
- && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
- segmentRefreshTime.put(segmentId, segmentRefreshInfo);
- return true;
+
+ if (segmentRefreshTime.get(segmentId) == null) {
+ if (segmentRefreshInfo.getSegmentUpdatedTimestamp() != null
+ && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
+ segmentRefreshTime.put(segmentId, segmentRefreshInfo);
+ return true;
+ }
+ if (segmentRefreshInfo.getSegmentFileTimestamp() != 0) {
+ segmentRefreshTime.put(segmentId, segmentRefreshInfo);
+ return true;
+ }
}
+
if (manualSegmentRefresh.get(segmentId) != null &&
manualSegmentRefresh.get(segmentId)) {
manualSegmentRefresh.put(segmentId, false);
return true;
diff --git
a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 6975620..7352bca 100644
---
a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -220,7 +220,7 @@ public class LatestFilesReadCommittedScope implements
ReadCommittedScope {
if (indexFileStore.get(timestamp) == null) {
indexList = new ArrayList<>(1);
segmentRefreshInfo =
- new
SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
+ new
SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0, 0L);
segmentTimestampUpdaterMap.put(timestamp, segmentRefreshInfo);
} else {
// Entry is already present.
diff --git
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 5d4ed4e..17a010e 100644
---
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -93,10 +94,17 @@ public class TableStatusReadCommittedScope implements
ReadCommittedScope {
public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment,
UpdateVO updateVo) {
SegmentRefreshInfo segmentRefreshInfo;
+ long segmentFileTimeStamp = 0L;
+ if (null != segment.getSegmentFileName()) {
+ segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
+ .getSegmentFilePath(identifier.getTablePath(),
segment.getSegmentFileName()))
+ .getLastModifiedTime();
+ }
if (updateVo != null) {
- segmentRefreshInfo = new
SegmentRefreshInfo(updateVo.getLatestUpdateTimestamp(), 0);
+ segmentRefreshInfo =
+ new SegmentRefreshInfo(updateVo.getLatestUpdateTimestamp(), 0,
segmentFileTimeStamp);
} else {
- segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+ segmentRefreshInfo = new SegmentRefreshInfo(0L, 0, segmentFileTimeStamp);
}
return segmentRefreshInfo;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
index 88b9176..d7b32cb 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
@@ -21,12 +21,17 @@ import java.io.Serializable;
public class SegmentRefreshInfo implements Serializable {
- private Long segmentUpdatedTimestamp;
+ private Long segmentUpdatedTimestamp = 0L;
private Integer countOfFileInSegment;
+ private Long segmentFileTimestamp = 0L;
- public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer
countOfFileInSegment) {
- this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+ public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer
countOfFileInSegment,
+ Long segmentFileTimestamp) {
+ if (segmentUpdatedTimestamp != null) {
+ this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+ }
this.countOfFileInSegment = countOfFileInSegment;
+ this.segmentFileTimestamp = segmentFileTimestamp;
}
public Long getSegmentUpdatedTimestamp() {
@@ -37,24 +42,21 @@ public class SegmentRefreshInfo implements Serializable {
this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
}
- public Integer getCountOfFileInSegment() {
- return countOfFileInSegment;
- }
-
public void setCountOfFileInSegment(Integer countOfFileInSegment) {
this.countOfFileInSegment = countOfFileInSegment;
}
+ public Long getSegmentFileTimestamp() {
+ return segmentFileTimestamp;
+ }
+
public boolean compare(Object o) {
if (!(o instanceof SegmentRefreshInfo)) return false;
SegmentRefreshInfo that = (SegmentRefreshInfo) o;
-
- if (segmentUpdatedTimestamp > that.segmentUpdatedTimestamp ||
!countOfFileInSegment
- .equals(that.countOfFileInSegment)) {
- return true;
- }
- return false;
+ return segmentUpdatedTimestamp > that.segmentUpdatedTimestamp
+ || segmentFileTimestamp > that.segmentFileTimestamp ||
!countOfFileInSegment
+ .equals(that.countOfFileInSegment);
}
@Override
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 2d5500b..bb17d53 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -783,23 +783,18 @@ public class SegmentUpdateStatusManager {
/**
* Returns the invalid timestamp range of a segment.
- * @param segmentId
- * @return
*/
- public UpdateVO getInvalidTimestampRange(String segmentId) {
+ public static UpdateVO getInvalidTimestampRange(LoadMetadataDetails
loadMetadataDetails) {
UpdateVO range = new UpdateVO();
- for (LoadMetadataDetails segment : segmentDetails) {
- if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
- range.setSegmentId(segmentId);
- range.setFactTimestamp(segment.getLoadStartTime());
- if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment
- .getUpdateDeltaEndTimestamp().isEmpty()) {
- range.setUpdateDeltaStartTimestamp(
-
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
- range.setLatestUpdateTimestamp(
-
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
- }
- return range;
+ if (loadMetadataDetails != null) {
+ range.setSegmentId(loadMetadataDetails.getLoadName());
+ range.setFactTimestamp(loadMetadataDetails.getLoadStartTime());
+ if (!loadMetadataDetails.getUpdateDeltaStartTimestamp().isEmpty() &&
!loadMetadataDetails
+ .getUpdateDeltaEndTimestamp().isEmpty()) {
+ range.setUpdateDeltaStartTimestamp(CarbonUpdateUtil
+
.getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaStartTimestamp()));
+ range.setLatestUpdateTimestamp(
+
CarbonUpdateUtil.getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaEndTimestamp()));
}
}
return range;
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bdda845..8fab3a9 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -359,13 +359,13 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
validSegments);
} else {
segmentsToBeRefreshed = DataMapStoreManager.getInstance()
- .getSegmentsToBeRefreshed(carbonTable, updateStatusManager,
validSegments);
+ .getSegmentsToBeRefreshed(carbonTable, validSegments);
}
numSegments = validSegments.size();
List<InputSplit> result = new LinkedList<InputSplit>();
UpdateVO invalidBlockVOForSegmentId = null;
- boolean isIUDTable = false;
+ boolean isIUDTable;
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
@@ -378,8 +378,8 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
// Get the UpdateVO for those tables on which IUD operations being
performed.
if (isIUDTable) {
- invalidBlockVOForSegmentId =
-
updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+ invalidBlockVOForSegmentId = SegmentUpdateStatusManager
+
.getInvalidTimestampRange(inputSplit.getSegment().getLoadMetadataDetails());
}
String[] deleteDeltaFilePath = null;
if (isIUDTable) {
@@ -453,13 +453,13 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
SDK is written one set of files with UUID, with same UUID it can write
again.
So, latest files content should reflect the new count by refreshing the
segment */
List<String> toBeCleanedSegments = new ArrayList<>();
- for (Segment eachSegment : filteredSegment) {
+ for (Segment segment : filteredSegment) {
boolean refreshNeeded = DataMapStoreManager.getInstance()
.getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration()))
- .isRefreshNeeded(eachSegment,
-
updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo()));
+ .isRefreshNeeded(segment, SegmentUpdateStatusManager
+ .getInvalidTimestampRange(segment.getLoadMetadataDetails()));
if (refreshNeeded) {
- toBeCleanedSegments.add(eachSegment.getSegmentNo());
+ toBeCleanedSegments.add(segment.getSegmentNo());
}
}
for (Segment segment : allSegments.getInvalidSegments()) {
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 2a7aa2d..d0e2eda 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -381,7 +381,7 @@ object DistributedRDDUtils {
// Adding valid segments to segments to be refreshed, so that the
select query
// goes in the same executor.
DataMapStoreManager.getInstance
- .getSegmentsToBeRefreshed(carbonTable, updateStatusManager,
validSegments.toList.asJava)
+ .getSegmentsToBeRefreshed(carbonTable, validSegments.toList.asJava)
val indexServerLoadEvent: IndexServerLoadEvent =
IndexServerLoadEvent(
sparkSession,
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 5c2cc1b..7ff5cc0 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -371,24 +371,26 @@ class CarbonMergerRDD[K, V](
.map(_.asInstanceOf[CarbonInputSplit])
.filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat)
}.toList.asJava
}
- val filteredSplits =
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
- val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
- val blockInfo = new TableBlockInfo(entry.getFilePath,
- entry.getStart, entry.getSegmentId,
- entry.getLocations, entry.getLength, entry.getVersion,
+ val filteredSplits =
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { inputSplit =>
+ val segmentId = Segment.toSegment(inputSplit.getSegmentId).getSegmentNo
+ val blockInfo = new TableBlockInfo(inputSplit.getFilePath,
+ inputSplit.getStart, inputSplit.getSegmentId,
+ inputSplit.getLocations, inputSplit.getLength, inputSplit.getVersion,
updateStatusManager.getDeleteDeltaFilePath(
- entry.getFilePath,
+ inputSplit.getFilePath,
segmentId)
)
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
- updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
+ updateDetails =
SegmentUpdateStatusManager.getInvalidTimestampRange(inputSplit
+ .getSegment
+ .getLoadMetadataDetails)
}
// filter splits with V3 data file format
// if split is updated, then check for if it is valid segment based on
update details
(!updated ||
(updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId,
blockInfo.getFilePath,
updateDetails, updateStatusManager)))) &&
- FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+ FileFormat.COLUMNAR_V3.equals(inputSplit.getFileFormat)
}
if (rangeColumn != null) {
totalTaskCount = totalTaskCount +
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
index 16a2091..b75380c 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
@@ -337,7 +337,7 @@ object BroadCastSIFilterPushJoin {
if (CarbonProperties.getInstance
.isDistributedPruningEnabled(carbonTable.getDatabaseName,
carbonTable.getTableName)) {
val segmentsToBeRefreshed: util.List[String] =
DataMapStoreManager.getInstance
- .getSegmentsToBeRefreshed(carbonTable, updateStatusManager,
validSegmentsToAccess)
+ .getSegmentsToBeRefreshed(carbonTable, validSegmentsToAccess)
try {
val dataMapFormat: IndexInputFormat =
new IndexInputFormat(carbonTable,