This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b74645e  [CARBONDATA-4148] Reindex failed when SI has stale 
carbonindexmerge file
b74645e is described below

commit b74645e353d850c8ab4bcb4a5723e9d23919da7b
Author: jack86596 <[email protected]>
AuthorDate: Fri Mar 12 15:07:40 2021 +0800

    [CARBONDATA-4148] Reindex failed when SI has stale carbonindexmerge file
    
    Why is this PR needed?
    Reindex failed when SI has stale carbonindexmerge file, throw exception 
FileNotFoundException.
    This is because SegmentFileStore.getIndexFiles stores the mapping of 
indexfile to indexmergefile,
    when stale carbon indexmergefile exists, indexmergefile will not be null. 
During merging index
    file, new indexmergefile will be created with same name as before in the 
same location.
    At the end of 
CarbonIndexFileMergeWriter.writeMergeIndexFileBasedOnSegmentFile, carbon index
    file will be deleted. Since indexmergefile is stored in the indexFiles 
list, newly created
    indexmergefile will be delete also, which leads to FileNotFoundException.
    
    What changes were proposed in this PR?
    1. SegmentFileStore.getIndexFiles stores the mapping of indexfile to 
indexmergefile which is redundant.
    2. SegmentFileStore.getIndexOrMergeFiles returns both index file and index 
merge file, so
       function name is incorrect, rename to getIndexAndMergeFiles.
    3. CarbonLoaderUtil.getActiveExecutor actually get active node, so function 
name is incorrect,
       rename to getActiveNode, together replace all "executor" with "node" in 
function assignBlocksByDataLocality.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4105
---
 .../carbondata/core/metadata/SegmentFileStore.java | 23 +++------
 .../TableStatusReadCommittedScope.java             |  2 +-
 .../apache/carbondata/core/util/SessionParams.java |  3 +-
 .../testsuite/secondaryindex/TestIndexRepair.scala | 13 ++++++
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  4 +-
 .../StandardPartitionTableCleanTestCase.scala      |  2 +-
 .../processing/util/CarbonLoaderUtil.java          | 54 ++++++++++------------
 7 files changed, 50 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 869cc5c..50ba335 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -831,8 +831,8 @@ public class SegmentFileStore {
    * Gets all index files from this segment
    * @return
    */
-  public Map<String, String> getIndexFiles() {
-    Map<String, String> indexFiles = new HashMap<>();
+  public Set<String> getIndexFiles() {
+    Set<String> indexFiles = new HashSet<>();
     if (segmentFile != null) {
       for (Map.Entry<String, FolderDetails> entry : 
getLocationMap().entrySet()) {
         String location = entry.getKey();
@@ -841,8 +841,7 @@ public class SegmentFileStore {
         }
         if 
(entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
           for (String indexFile : entry.getValue().getFiles()) {
-            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + 
indexFile,
-                entry.getValue().mergeFileName);
+            indexFiles.add(location + CarbonCommonConstants.FILE_SEPARATOR + 
indexFile);
           }
         }
       }
@@ -854,7 +853,7 @@ public class SegmentFileStore {
    * Gets all index files from this segment
    * @return
    */
-  public Map<String, String> getIndexOrMergeFiles() throws IOException {
+  public Map<String, String> getIndexAndMergeFiles() throws IOException {
     Map<String, String> indexFiles = new HashMap<>();
     if (segmentFile != null) {
       for (Map.Entry<String, FolderDetails> entry : 
getLocationMap().entrySet()) {
@@ -898,17 +897,9 @@ public class SegmentFileStore {
    * @return
    */
   public List<CarbonFile> getIndexCarbonFiles() {
-    Map<String, String> indexFiles = getIndexFiles();
-    Set<String> files = new HashSet<>();
-    for (Map.Entry<String, String> entry: indexFiles.entrySet()) {
-      Path path = new Path(entry.getKey());
-      files.add(entry.getKey());
-      if (entry.getValue() != null) {
-        files.add(new Path(path.getParent(), entry.getValue()).toString());
-      }
-    }
+    Set<String> indexFiles = getIndexFiles();
     List<CarbonFile> carbonFiles = new ArrayList<>();
-    for (String indexFile : files) {
+    for (String indexFile : indexFiles) {
       CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
       if (carbonFile.exists()) {
         carbonFiles.add(carbonFile);
@@ -1357,7 +1348,7 @@ public class SegmentFileStore {
     } else {
       SegmentFileStore segmentFileStore =
           new SegmentFileStore(tablePath, segment.getSegmentFileName());
-      indexFiles = segmentFileStore.getIndexOrMergeFiles().keySet();
+      indexFiles = segmentFileStore.getIndexAndMergeFiles().keySet();
     }
     return indexFiles;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 5413cec..cd3992b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -86,7 +86,7 @@ public class TableStatusReadCommittedScope implements 
ReadCommittedScope {
     } else {
       SegmentFileStore fileStore =
           new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
-      indexFiles = fileStore.getIndexOrMergeFiles();
+      indexFiles = fileStore.getIndexAndMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index aa9abfe..b62d83b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -49,7 +48,7 @@ import org.apache.log4j.Logger;
 public class SessionParams implements Serializable, Cloneable {
 
   private static final Logger LOGGER =
-      LogServiceFactory.getLogService(CacheProvider.class.getName());
+      LogServiceFactory.getLogService(SessionParams.class.getName());
   private static final long serialVersionUID = -7801994600594915264L;
 
   private Map<String, String> sProps;
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
index 8b5bf3f..af615cb 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -119,6 +119,19 @@ class TestIndexRepair extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists maintable")
   }
 
+  test("reindex command with stale files") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as 
carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN 
(0,1)")
+    assert(sql("select * from maintable where c = 'string2'").count() == 2)
+    sql("drop table if exists maintable")
+  }
+
   test("insert command after deleting segments from SI table") {
     sql("drop table if exists maintable")
     sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as 
carbondata")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index e24f52c..094be92 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -502,7 +502,7 @@ object CarbonIndexUtil {
                 if (null == detail || detail.length == 0) {
                   val newDetails = new LoadMetadataDetails
                   newDetails.setLoadName(metadataDetail)
-                  LOGGER.error(
+                  LOGGER.info(
                     "Added in SILoadFailedSegment " + newDetails.getLoadName + 
" for SI" +
                     " table " + indexTableName + "." + 
carbonTable.getTableName)
                   failedLoadMetadataDetails.add(newDetails)
@@ -524,7 +524,7 @@ object CarbonIndexUtil {
                         LockUsage.LOCK)
                     if 
(segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
                       segmentLocks += segmentLockOfProbableOnCompactionSeg
-                      LOGGER.error(
+                      LOGGER.info(
                         "Added in SILoadFailedSegment " + 
detail(0).getLoadName + " for SI "
                         + "table " + indexTableName + "." + 
carbonTable.getTableName)
                       failedLoadMetadataDetails.add(detail(0))
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index cd87bc8..9342828 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -69,7 +69,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest 
with BeforeAndAfterA
       CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
     val segLoad = details.find(_.getLoadName.equals(segmentId)).get
     val seg = new SegmentFileStore(carbonTable.getTablePath, 
segLoad.getSegmentFile)
-    assert(seg.getIndexOrMergeFiles.size == indexes)
+    assert(seg.getIndexAndMergeFiles.size == indexes)
   }
 
   test("clean up partition table for int partition column") {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 32c5ee5..1d35502 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -896,19 +896,19 @@ public final class CarbonLoaderUtil {
       Collections.sort(inputNode2Blocks);
     }
 
-    Map<String, Integer> executor2Idx = new HashMap<>();
+    Map<String, Integer> node2Idx = new HashMap<>();
     for (NodeMultiBlockRelation nodeMultiBlockRelation : inputNode2Blocks) {
       String nodeName = nodeMultiBlockRelation.getNode();
       // assign the block to the node only if the node is active
-      String activeExecutor = nodeName;
+      String activeNode = nodeName;
       if (null != activeNodes) {
-        activeExecutor = getActiveExecutor(activeNodes, nodeName);
-        if (null == activeExecutor) {
+        activeNode = getActiveNode(activeNodes, nodeName);
+        if (null == activeNode) {
           continue;
         }
       }
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("First Assignment iteration: assign for executor: " + 
activeExecutor);
+        LOGGER.debug("First Assignment iteration: assign for executor: " + 
activeNode);
       }
 
       List<Distributable> blocksInThisNode = 
nodeMultiBlockRelation.getBlocks();
@@ -925,17 +925,17 @@ public final class CarbonLoaderUtil {
           continue;
         }
         // this is the first time to add block to this node, initialize it
-        if (!executor2Idx.containsKey(activeExecutor)) {
-          Integer idx = executor2Idx.size();
-          outputNode2Blocks.add(idx, new NodeMultiBlockRelation(activeExecutor,
+        if (!node2Idx.containsKey(activeNode)) {
+          Integer idx = node2Idx.size();
+          outputNode2Blocks.add(idx, new NodeMultiBlockRelation(activeNode,
               new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)));
-          executor2Idx.put(activeExecutor, idx);
+          node2Idx.put(activeNode, idx);
         }
 
         // assign this block to this node if node has capacity left
         if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == 
blockAssignmentStrategy) {
           if (nodeCapacity < expectedSizePerNode) {
-            Integer idx = executor2Idx.get(activeExecutor);
+            Integer idx = node2Idx.get(activeNode);
             List<Distributable> infos = outputNode2Blocks.get(idx).getBlocks();
             infos.add(block);
             nodeCapacity++;
@@ -943,7 +943,7 @@ public final class CarbonLoaderUtil {
               try {
                 LOGGER.debug("First Assignment iteration: block("
                     + StringUtils.join(block.getLocations(), ", ")
-                    + ")-->" + activeExecutor);
+                    + ")-->" + activeNode);
               } catch (IOException e) {
                 LOGGER.error(e.getMessage(), e);
               }
@@ -960,14 +960,14 @@ public final class CarbonLoaderUtil {
           // be assigned in the last RoundRobin iteration.
           if (nodeCapacity == 0 || nodeCapacity < expectedSizePerNode) {
             if (nodeCapacity == 0 || nodeCapacity + thisBlockSize <= 
expectedSizePerNode * 1.05D) {
-              Integer idx = executor2Idx.get(activeExecutor);
+              Integer idx = node2Idx.get(activeNode);
               List<Distributable> blocks = 
outputNode2Blocks.get(idx).getBlocks();
               blocks.add(block);
               nodeCapacity += thisBlockSize;
               if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
                     "First Assignment iteration: " + ((TableBlockInfo) 
block).getFilePath() + '-'
-                        + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeExecutor);
+                        + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeNode);
               }
               remainingBlocks.remove(block);
             }
@@ -1038,34 +1038,30 @@ public final class CarbonLoaderUtil {
    *
    * @param activeNode
    * @param nodeName
-   * @return returns true if active else false.
+   * @return hostName or hostAddress if node is active
    */
-  private static String getActiveExecutor(List activeNode, String nodeName) {
-    boolean isActiveNode = activeNode.contains(nodeName);
-    if (isActiveNode) {
-      return nodeName;
-    }
-    //if localhost then retrieve the localhost name then do the check
-    else if (nodeName.equals("localhost")) {
-      try {
+  private static String getActiveNode(List<String> activeNode, String 
nodeName) {
+    try {
+      boolean isActiveNode = activeNode.contains(nodeName);
+      if (isActiveNode) {
+        return nodeName;
+      }
+      //if localhost then retrieve the localhost name then do the check
+      else if (nodeName.equals("localhost")) {
         String hostName = InetAddress.getLocalHost().getHostName();
         isActiveNode = activeNode.contains(hostName);
         if (isActiveNode) {
           return hostName;
         }
-      } catch (UnknownHostException ue) {
-        isActiveNode = false;
-      }
-    } else {
-      try {
+      } else {
         String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
         isActiveNode = activeNode.contains(hostAddress);
         if (isActiveNode) {
           return hostAddress;
         }
-      } catch (UnknownHostException ue) {
-        isActiveNode = false;
       }
+    } catch (UnknownHostException ue) {
+      return null;
     }
     return null;
   }

Reply via email to