akashrn5 commented on a change in pull request #3701:
URL: https://github.com/apache/carbondata/pull/3701#discussion_r413567879



##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -162,6 +165,49 @@ public void 
readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, Str
     }
   }
 
+  /**
+   * Get all index files list
+   *
+   * @param segmentFile
+   */
+  public List<CarbonFile> getAllIIndexOfSegment(SegmentFileStore.SegmentFile 
segmentFile,

Review comment:
       please remove this method, it has duplicate code, please reuse 
`org.apache.carbondata.core.metadata.SegmentFileStore#getIndexCarbonFiles`

##########
File path: 
core/src/main/java/org/apache/carbondata/core/index/SegmentIndexMeta.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+public class SegmentIndexMeta {

Review comment:
       please add class level comment

##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -289,6 +335,50 @@ public void readMergeFile(String mergeFilePath) throws 
IOException {
     }
   }
 
+  /**
+   * Read carbonindexmerge file and get data count
+   *
+   * @param mergeFilePath
+   * @throws IOException
+   */
+  public int readMergeIdxFileGetRowCount(String mergeFilePath) throws 
IOException {
+    ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration);
+    int rowCount = 0;
+    try {
+      thriftReader.open();
+      MergedBlockIndexHeader indexHeader = 
readMergeBlockIndexHeader(thriftReader);
+      MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
+      List<String> file_names = indexHeader.getFile_names();
+      List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+      assert (file_names.size() == fileData.size());
+      for (int i = 0; i < file_names.size(); i++) {
+        byte[] data = fileData.get(i).array();
+        DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter(configuration);
+        List<DataFileFooter> fileFooters =
+                fileFooterConverter.getIndexInfo(file_names.get(i), data);
+        for (DataFileFooter footer : fileFooters) {
+          rowCount += footer.getNumberOfRows();
+        }
+      }
+      return rowCount;
+    } finally {
+      thriftReader.close();
+    }
+  }
+
+  public int readIdxFileGetRowCount(String indexFilePath) throws IOException {

Review comment:
       Same as above, please reuse 
`org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore#readIndexFile`.
 Please avoid code duplicaton

##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -289,6 +335,50 @@ public void readMergeFile(String mergeFilePath) throws 
IOException {
     }
   }
 
+  /**
+   * Read carbonindexmerge file and get data count
+   *
+   * @param mergeFilePath
+   * @throws IOException
+   */
+  public int readMergeIdxFileGetRowCount(String mergeFilePath) throws 
IOException {

Review comment:
       remove this method and please reuse 
`org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore#readMergeFile`
 method and get info from there, if not direct, please i suggest to do proper 
refactor and get the row from there

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
##########
@@ -154,8 +155,32 @@ object CountStarPlan {
       return false
     }
     child collect {
-      case cd: Filter => return false
+      case filter: Filter => if (isPurePartitionPrune(child)) return true else 
return false
     }
     true
   }
+

Review comment:
       please add a comment,what exactly meaning pure partitions.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment 
file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and 
cache index files list
+  // of each segment into memory. If its index files already exist in cache, 
not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned 
index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get 
from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.ArrayList[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => 
segment.getSegmentFile)
+      val tableSegmentIndexes = 
DataMapStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)

Review comment:
       @Zhangshunyu @QiangCai just for count(*) , again loading separate cache 
and using driver memory might impact other query time and memory usage for 
other queries in driver and count(*) is not more often query i feel.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment 
file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and 
cache index files list
+  // of each segment into memory. If its index files already exist in cache, 
not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned 
index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get 
from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       instead of getting details and then filtering, please use 
`org.apache.carbondata.core.statusmanager.SegmentStatusManager.ValidAndInvalidSegmentsInfo#getValidSegments`
 and then can get the segmentFileNames.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment 
file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and 
cache index files list
+  // of each segment into memory. If its index files already exist in cache, 
not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned 
index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get 
from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => 
segment.getSegmentFile)
+      val tableSegmentIndexes = 
IndexStoreManager.getInstance().getAllSegmentIndexes(

Review comment:
       How about the old stores, where we do not have segmentFiles? in that 
case its not handled. please handle

##########
File path: 
core/src/main/java/org/apache/carbondata/core/index/SegmentIndexMeta.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+public class SegmentIndexMeta {
+  private String segmentFilePath;
+  private final List<CarbonFile> segmentIndexFiles = new ArrayList<>();
+  private final Map<String, Long> prunedIndexFileToRowCountMap = new 
ConcurrentHashMap<>();
+
+  public SegmentIndexMeta(String segmentFilePath) {

Review comment:
       How are you handling in case of old segments where segment file wont be 
present?

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment 
file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and 
cache index files list
+  // of each segment into memory. If its index files already exist in cache, 
not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned 
index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get 
from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => 
segment.getSegmentFile)
+      val tableSegmentIndexes = 
IndexStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
+      val executorService = Executors.newFixedThreadPool(numThreads)
+      // to get the index files of valid segments from cache or scan through 
each segment
+      val futures = new java.util.ArrayList[Future[Long]]
+      for (segmentPath <- validSegmentPaths) {
+        futures.add(executorService.submit(new Callable[Long] {
+          override def call(): Long = {
+            var rowCountCurrentSeg: Long = 0
+            var indexFilesCurrentSeg: java.util.List[CarbonFile] = null;
+            // tableSegmentIndexes already init.
+            if (tableSegmentIndexes.get(segmentPath) != null &&
+              tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles.size() 
> 0) {
+              indexFilesCurrentSeg =
+                tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles
+            } else {
+              // read from seg file
+              val sfs = new SegmentFileStore(carbonTable.getTablePath, 
segmentPath)

Review comment:
       This will give NullPointerException if the segmentFileName is null for a 
segment or  line number 191, `indexFilesCurrentSeg` will get null

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment 
file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and 
cache index files list
+  // of each segment into memory. If its index files already exist in cache, 
not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned 
index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get 
from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => 
segment.getSegmentFile)
+      val tableSegmentIndexes = 
IndexStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
+      val executorService = Executors.newFixedThreadPool(numThreads)
+      // to get the index files of valid segments from cache or scan through 
each segment
+      val futures = new java.util.ArrayList[Future[Long]]
+      for (segmentPath <- validSegmentPaths) {
+        futures.add(executorService.submit(new Callable[Long] {
+          override def call(): Long = {
+            var rowCountCurrentSeg: Long = 0
+            var indexFilesCurrentSeg: java.util.List[CarbonFile] = null;
+            // tableSegmentIndexes already init.
+            if (tableSegmentIndexes.get(segmentPath) != null &&
+              tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles.size() 
> 0) {
+              indexFilesCurrentSeg =
+                tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles
+            } else {
+              // read from seg file
+              val sfs = new SegmentFileStore(carbonTable.getTablePath, 
segmentPath)
+              val indexFileStore = new SegmentIndexFileStore
+              indexFilesCurrentSeg = 
indexFileStore.getAllIIndexOfSegment(sfs.getSegmentFile,
+                carbonTable.getTablePath, SegmentStatus.SUCCESS, false)
+              // tableSegmentIndexes already init and segmentIndexMeta not 
null.
+              val segmentIndexMeta = tableSegmentIndexes.get(segmentPath)
+              segmentIndexMeta.addSegmentIndexFiles(indexFilesCurrentSeg)
+              // cache all the index files of this segment
+              tableSegmentIndexes.put(segmentPath, segmentIndexMeta)
+            }
+
+            // use partition prune to get row count of each pruned index file 
and
+            // cache it for this segment
+            var prunedIndexFilesCurrentSeg: java.util.List[String] =
+            new java.util.ArrayList[String]()
+            for (indexFile <- indexFilesCurrentSeg.asScala) {
+              // check whether the index files located in pruned partition
+              val formattedPath = indexFile.getAbsolutePath
+                .replace("\\", "/")
+              if (prunedPartitionPaths.contains(
+                formattedPath.substring(0, formattedPath.lastIndexOf("/")))) {
+                prunedIndexFilesCurrentSeg.add(formattedPath)
+              }
+            }
+            // get the row count from cache or read it from pruned index files
+            var toReadIndexFiles: java.util.List[String] = new 
java.util.ArrayList[String]()
+            prunedIndexFilesCurrentSeg.asScala.foreach(prunedIndexFilePath =>
+              if 
(tableSegmentIndexes.get(segmentPath).getPrunedIndexFileToRowCountMap

Review comment:
       refactor and assign to  variable for 
`tableSegmentIndexes.get(segmentPath).getPrunedIndexFileToRowCountMap 
.get(prunedIndexFilePath)` and to update in `SegmentIndexMeta` add a setter for 
map

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
##########
@@ -531,7 +531,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with 
BeforeAndAfterAll {
     sql("drop index dm on table index_test_overwrite")
   }
 
-  test("explain query with lucene index") {
+  ignore("explain query with lucene index") {

Review comment:
       why this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to