akashrn5 commented on a change in pull request #3701:
URL: https://github.com/apache/carbondata/pull/3701#discussion_r413567879
##########
File path:
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -162,6 +165,49 @@ public void
readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, Str
}
}
+ /**
+ * Get all index files list
+ *
+ * @param segmentFile
+ */
+ public List<CarbonFile> getAllIIndexOfSegment(SegmentFileStore.SegmentFile
segmentFile,
Review comment:
please remove this method, it has duplicate code, please reuse
`org.apache.carbondata.core.metadata.SegmentFileStore#getIndexCarbonFiles`
##########
File path:
core/src/main/java/org/apache/carbondata/core/index/SegmentIndexMeta.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+public class SegmentIndexMeta {
Review comment:
please add class level comment
##########
File path:
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -289,6 +335,50 @@ public void readMergeFile(String mergeFilePath) throws
IOException {
}
}
+ /**
+ * Read carbonindexmerge file and get data count
+ *
+ * @param mergeFilePath
+ * @throws IOException
+ */
+ public int readMergeIdxFileGetRowCount(String mergeFilePath) throws
IOException {
+ ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration);
+ int rowCount = 0;
+ try {
+ thriftReader.open();
+ MergedBlockIndexHeader indexHeader =
readMergeBlockIndexHeader(thriftReader);
+ MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
+ List<String> file_names = indexHeader.getFile_names();
+ List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+ assert (file_names.size() == fileData.size());
+ for (int i = 0; i < file_names.size(); i++) {
+ byte[] data = fileData.get(i).array();
+ DataFileFooterConverter fileFooterConverter = new
DataFileFooterConverter(configuration);
+ List<DataFileFooter> fileFooters =
+ fileFooterConverter.getIndexInfo(file_names.get(i), data);
+ for (DataFileFooter footer : fileFooters) {
+ rowCount += footer.getNumberOfRows();
+ }
+ }
+ return rowCount;
+ } finally {
+ thriftReader.close();
+ }
+ }
+
+ public int readIdxFileGetRowCount(String indexFilePath) throws IOException {
Review comment:
Same as above, please reuse
`org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore#readIndexFile`.
Please avoid code duplicaton
##########
File path:
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -289,6 +335,50 @@ public void readMergeFile(String mergeFilePath) throws
IOException {
}
}
+ /**
+ * Read carbonindexmerge file and get data count
+ *
+ * @param mergeFilePath
+ * @throws IOException
+ */
+ public int readMergeIdxFileGetRowCount(String mergeFilePath) throws
IOException {
Review comment:
remove this method and please reuse
`org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore#readMergeFile`
method and get info from there, if not direct, please i suggest to do proper
refactor and get the row from there
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
##########
@@ -154,8 +155,32 @@ object CountStarPlan {
return false
}
child collect {
- case cd: Filter => return false
+ case filter: Filter => if (isPurePartitionPrune(child)) return true else
return false
}
true
}
+
Review comment:
please add a comment,what exactly meaning pure partitions.
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
+
+ // The detail of query flow as following for pure partition count star:
+ // Step 1. check whether it is pure partition count star by filter
+ // Step 2. read tablestatus to get all valid segments, remove the segment
file cache of invalid
+ // segment and expired segment
+ // Step 3. use multi-thread to read segment files which not in cache and
cache index files list
+ // of each segment into memory. If its index files already exist in cache,
not required to
+ // read again.
+ // Step 4. use multi-thread to prune segment and partition to get pruned
index file list, which
+ // can prune most index files and reduce the files num.
+ // Step 5. read the count from pruned index file directly and cache it, get
from cache if exist
+ // in the index_file <-> rowCount map.
+ private def getRowCountPurePartitionPrune: Long = {
+ var rowCount: Long = 0
+ val prunedPartitionPaths = new java.util.ArrayList[String]()
+ // Get the current partitions from table.
+ val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+ if (partitions != null) {
+ for (partition <- partitions) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val validSegmentPaths = details.filter(segment =>
+ ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+ (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+ && segment.getSegmentFile != null).map(segment =>
segment.getSegmentFile)
+ val tableSegmentIndexes =
DataMapStoreManager.getInstance().getAllSegmentIndexes(
+ carbonTable.getTableId)
+ if (!tableSegmentIndexes.isEmpty) {
+ // clear invalid cache
+ for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+ if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+ // means invalid cache
+ tableSegmentIndexes.remove(segmentFilePathInCache)
+ }
+ }
+ }
+ // init and put absent the valid cache
+ for (validSegmentPath <- validSegmentPaths) {
+ if (tableSegmentIndexes.get(validSegmentPath) == null) {
+ val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+ tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+ }
+ }
+
+ val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
Review comment:
@Zhangshunyu @QiangCai just for count(*) , again loading separate cache
and using driver memory might impact other query time and memory usage for
other queries in driver and count(*) is not more often query i feel.
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
+
+ // The detail of query flow as following for pure partition count star:
+ // Step 1. check whether it is pure partition count star by filter
+ // Step 2. read tablestatus to get all valid segments, remove the segment
file cache of invalid
+ // segment and expired segment
+ // Step 3. use multi-thread to read segment files which not in cache and
cache index files list
+ // of each segment into memory. If its index files already exist in cache,
not required to
+ // read again.
+ // Step 4. use multi-thread to prune segment and partition to get pruned
index file list, which
+ // can prune most index files and reduce the files num.
+ // Step 5. read the count from pruned index file directly and cache it, get
from cache if exist
+ // in the index_file <-> rowCount map.
+ private def getRowCountPurePartitionPrune: Long = {
+ var rowCount: Long = 0
+ val prunedPartitionPaths = new java.util.HashSet[String]()
+ // Get the current partitions from table.
+ val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+ if (partitions != null) {
+ for (partition <- partitions) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
Review comment:
instead of getting details and then filtering, please use
`org.apache.carbondata.core.statusmanager.SegmentStatusManager.ValidAndInvalidSegmentsInfo#getValidSegments`
and then can get the segmentFileNames.
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
+
+ // The detail of query flow as following for pure partition count star:
+ // Step 1. check whether it is pure partition count star by filter
+ // Step 2. read tablestatus to get all valid segments, remove the segment
file cache of invalid
+ // segment and expired segment
+ // Step 3. use multi-thread to read segment files which not in cache and
cache index files list
+ // of each segment into memory. If its index files already exist in cache,
not required to
+ // read again.
+ // Step 4. use multi-thread to prune segment and partition to get pruned
index file list, which
+ // can prune most index files and reduce the files num.
+ // Step 5. read the count from pruned index file directly and cache it, get
from cache if exist
+ // in the index_file <-> rowCount map.
+ private def getRowCountPurePartitionPrune: Long = {
+ var rowCount: Long = 0
+ val prunedPartitionPaths = new java.util.HashSet[String]()
+ // Get the current partitions from table.
+ val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+ if (partitions != null) {
+ for (partition <- partitions) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val validSegmentPaths = details.filter(segment =>
+ ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+ (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+ && segment.getSegmentFile != null).map(segment =>
segment.getSegmentFile)
+ val tableSegmentIndexes =
IndexStoreManager.getInstance().getAllSegmentIndexes(
Review comment:
How about the old stores, where we do not have segmentFiles? in that
case its not handled. please handle
##########
File path:
core/src/main/java/org/apache/carbondata/core/index/SegmentIndexMeta.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+public class SegmentIndexMeta {
+ private String segmentFilePath;
+ private final List<CarbonFile> segmentIndexFiles = new ArrayList<>();
+ private final Map<String, Long> prunedIndexFileToRowCountMap = new
ConcurrentHashMap<>();
+
+ public SegmentIndexMeta(String segmentFilePath) {
Review comment:
How are you handling in case of old segments where segment file wont be
present?
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
+
+ // The detail of query flow as following for pure partition count star:
+ // Step 1. check whether it is pure partition count star by filter
+ // Step 2. read tablestatus to get all valid segments, remove the segment
file cache of invalid
+ // segment and expired segment
+ // Step 3. use multi-thread to read segment files which not in cache and
cache index files list
+ // of each segment into memory. If its index files already exist in cache,
not required to
+ // read again.
+ // Step 4. use multi-thread to prune segment and partition to get pruned
index file list, which
+ // can prune most index files and reduce the files num.
+ // Step 5. read the count from pruned index file directly and cache it, get
from cache if exist
+ // in the index_file <-> rowCount map.
+ private def getRowCountPurePartitionPrune: Long = {
+ var rowCount: Long = 0
+ val prunedPartitionPaths = new java.util.HashSet[String]()
+ // Get the current partitions from table.
+ val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+ if (partitions != null) {
+ for (partition <- partitions) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val validSegmentPaths = details.filter(segment =>
+ ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+ (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+ && segment.getSegmentFile != null).map(segment =>
segment.getSegmentFile)
+ val tableSegmentIndexes =
IndexStoreManager.getInstance().getAllSegmentIndexes(
+ carbonTable.getTableId)
+ if (!tableSegmentIndexes.isEmpty) {
+ // clear invalid cache
+ for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+ if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+ // means invalid cache
+ tableSegmentIndexes.remove(segmentFilePathInCache)
+ }
+ }
+ }
+ // init and put absent the valid cache
+ for (validSegmentPath <- validSegmentPaths) {
+ if (tableSegmentIndexes.get(validSegmentPath) == null) {
+ val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+ tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+ }
+ }
+
+ val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
+ val executorService = Executors.newFixedThreadPool(numThreads)
+ // to get the index files of valid segments from cache or scan through
each segment
+ val futures = new java.util.ArrayList[Future[Long]]
+ for (segmentPath <- validSegmentPaths) {
+ futures.add(executorService.submit(new Callable[Long] {
+ override def call(): Long = {
+ var rowCountCurrentSeg: Long = 0
+ var indexFilesCurrentSeg: java.util.List[CarbonFile] = null;
+ // tableSegmentIndexes already init.
+ if (tableSegmentIndexes.get(segmentPath) != null &&
+ tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles.size()
> 0) {
+ indexFilesCurrentSeg =
+ tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles
+ } else {
+ // read from seg file
+ val sfs = new SegmentFileStore(carbonTable.getTablePath,
segmentPath)
Review comment:
This will give NullPointerException if the segmentFileName is null for a
segment or line number 191, `indexFilesCurrentSeg` will get null
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
+
+ // The detail of query flow as following for pure partition count star:
+ // Step 1. check whether it is pure partition count star by filter
+ // Step 2. read tablestatus to get all valid segments, remove the segment
file cache of invalid
+ // segment and expired segment
+ // Step 3. use multi-thread to read segment files which not in cache and
cache index files list
+ // of each segment into memory. If its index files already exist in cache,
not required to
+ // read again.
+ // Step 4. use multi-thread to prune segment and partition to get pruned
index file list, which
+ // can prune most index files and reduce the files num.
+ // Step 5. read the count from pruned index file directly and cache it, get
from cache if exist
+ // in the index_file <-> rowCount map.
+ private def getRowCountPurePartitionPrune: Long = {
+ var rowCount: Long = 0
+ val prunedPartitionPaths = new java.util.HashSet[String]()
+ // Get the current partitions from table.
+ val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+ if (partitions != null) {
+ for (partition <- partitions) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val validSegmentPaths = details.filter(segment =>
+ ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+ (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+ && segment.getSegmentFile != null).map(segment =>
segment.getSegmentFile)
+ val tableSegmentIndexes =
IndexStoreManager.getInstance().getAllSegmentIndexes(
+ carbonTable.getTableId)
+ if (!tableSegmentIndexes.isEmpty) {
+ // clear invalid cache
+ for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+ if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+ // means invalid cache
+ tableSegmentIndexes.remove(segmentFilePathInCache)
+ }
+ }
+ }
+ // init and put absent the valid cache
+ for (validSegmentPath <- validSegmentPaths) {
+ if (tableSegmentIndexes.get(validSegmentPath) == null) {
+ val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+ tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+ }
+ }
+
+ val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
+ val executorService = Executors.newFixedThreadPool(numThreads)
+ // to get the index files of valid segments from cache or scan through
each segment
+ val futures = new java.util.ArrayList[Future[Long]]
+ for (segmentPath <- validSegmentPaths) {
+ futures.add(executorService.submit(new Callable[Long] {
+ override def call(): Long = {
+ var rowCountCurrentSeg: Long = 0
+ var indexFilesCurrentSeg: java.util.List[CarbonFile] = null;
+ // tableSegmentIndexes already init.
+ if (tableSegmentIndexes.get(segmentPath) != null &&
+ tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles.size()
> 0) {
+ indexFilesCurrentSeg =
+ tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles
+ } else {
+ // read from seg file
+ val sfs = new SegmentFileStore(carbonTable.getTablePath,
segmentPath)
+ val indexFileStore = new SegmentIndexFileStore
+ indexFilesCurrentSeg =
indexFileStore.getAllIIndexOfSegment(sfs.getSegmentFile,
+ carbonTable.getTablePath, SegmentStatus.SUCCESS, false)
+ // tableSegmentIndexes already init and segmentIndexMeta not
null.
+ val segmentIndexMeta = tableSegmentIndexes.get(segmentPath)
+ segmentIndexMeta.addSegmentIndexFiles(indexFilesCurrentSeg)
+ // cache all the index files of this segment
+ tableSegmentIndexes.put(segmentPath, segmentIndexMeta)
+ }
+
+ // use partition prune to get row count of each pruned index file
and
+ // cache it for this segment
+ var prunedIndexFilesCurrentSeg: java.util.List[String] =
+ new java.util.ArrayList[String]()
+ for (indexFile <- indexFilesCurrentSeg.asScala) {
+ // check whether the index files located in pruned partition
+ val formattedPath = indexFile.getAbsolutePath
+ .replace("\\", "/")
+ if (prunedPartitionPaths.contains(
+ formattedPath.substring(0, formattedPath.lastIndexOf("/")))) {
+ prunedIndexFilesCurrentSeg.add(formattedPath)
+ }
+ }
+ // get the row count from cache or read it from pruned index files
+ var toReadIndexFiles: java.util.List[String] = new
java.util.ArrayList[String]()
+ prunedIndexFilesCurrentSeg.asScala.foreach(prunedIndexFilePath =>
+ if
(tableSegmentIndexes.get(segmentPath).getPrunedIndexFileToRowCountMap
Review comment:
refactor and assign to variable for
`tableSegmentIndexes.get(segmentPath).getPrunedIndexFileToRowCountMap
.get(prunedIndexFilePath)` and to update in `SegmentIndexMeta` add a setter for
map
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
##########
@@ -531,7 +531,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with
BeforeAndAfterAll {
sql("drop index dm on table index_test_overwrite")
}
- test("explain query with lucene index") {
+ ignore("explain query with lucene index") {
Review comment:
why this change?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]