This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch f_index_dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/f_index_dev by this push:
     new c0744f1  fix concurrent bug temp
c0744f1 is described below

commit c0744f13f7fffacf6349d007b8a4167ef946d385
Author: kr11 <3095717866.com>
AuthorDate: Sun May 16 16:05:59 2021 +0800

    fix concurrent bug temp
---
 .../resources/conf/iotdb-engine.properties         |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  3 +-
 .../org/apache/iotdb/db/index/IndexManager.java    |  3 +
 .../org/apache/iotdb/db/index/IndexProcessor.java  | 64 ++++++++++++++--------
 .../iotdb/db/index/algorithm/RTreeIndex.java       |  6 +-
 .../iotdb/db/index/algorithm/elb/ELBIndex.java     |  6 +-
 .../iotdb/db/index/algorithm/mmhh/MMHHIndex.java   | 11 +++-
 .../iotdb/db/index/algorithm/rtree/RTree.java      | 56 +++++++++++--------
 9 files changed, 96 insertions(+), 57 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index a089406..3b9bf5a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -661,7 +661,7 @@ default_index_window_range=10
 index_buffer_size=134217728
 
 # the max returned size of index query result
-max_index_query_result_size=5
+max_index_query_result_size=100
 
 # the default value of max size of the index unusable segments, used in 
SubMatchIndexUsability
 default_max_size_of_unusable_segments=20
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 07b56e3..af670f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -644,7 +644,7 @@ public class IoTDBConfig {
   /** the number of virtual storage groups per user-defined storage group */
   private int virtualStorageGroupNum = 1;
 
-  private int maxIndexQueryResultSize = 20;
+  private int maxIndexQueryResultSize = 100;
 
   public IoTDBConfig() {
     // empty constructor
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 21b0e3c..5efbd70 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -162,7 +162,8 @@ public class MemTableFlushTask {
       try {
         indexFlushTask.endFlush();
       } catch (Exception e) {
-        LOGGER.error("meet Exception in endFlush, not affect the memtable 
flushing", e);
+        LOGGER.info("meet Exception in endFlush, not affect the memtable 
flushing");
+        //        System.out.println("endFlush has finished");
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java 
b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
index 03f2e02..0216565 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -204,6 +204,7 @@ public class IndexManager implements IndexManagerMBean, 
IService {
     IndexStatManager.totalQueryCost = current;
     IndexStatManager.latestTimestamp = current + timeout * 1000_000;
     logger.info("index query time out has been set to {} sec", timeout);
+    System.out.println("index query time out has been set to sec:" + timeout);
     if (paths.size() != 1) {
       throw new QueryIndexException("Index allows to query only one path");
     }
@@ -277,6 +278,7 @@ public class IndexManager implements IndexManagerMBean, 
IService {
       return;
     }
     logger.info("IndexManager starts...");
+    System.out.println("IndexManager starts...");
     IndexBuildTaskPoolManager.getInstance().start();
     try {
       JMXService.registerMBean(this, ServiceType.INDEX_SERVICE.getJmxName());
@@ -287,6 +289,7 @@ public class IndexManager implements IndexManagerMBean, 
IService {
       throw new StartupException(e);
     }
     logger.info("IndexManager starts successfully");
+    System.out.println("IndexManager starts successfully");
   }
 
   public Map<PartialPath, Map<IndexType, IndexInfo>> getIndexInfos(PartialPath 
prefixPath) {
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java 
b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
index 9795442..4548674 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
@@ -398,28 +398,48 @@ public class IndexProcessor implements 
Comparable<IndexProcessor> {
               numIndexBuildTasks.decrementAndGet();
               return;
             }
-            Runnable buildTask =
-                () -> {
-                  try {
-                    indexLockMap.get(indexType).writeLock().lock();
-                    IndexFeatureExtractor extractor = 
index.startFlushTask(path, tvList);
-                    while (extractor.hasNext()) {
-                      extractor.processNext();
-                      index.buildNext();
-                    }
-                    index.endFlushTask();
-                  } catch (IndexManagerException e) {
-                    // Give up the following data, but the previously 
serialized chunk will not be
-                    // affected.
-                    logger.error("build index failed", e);
-                  } catch (RuntimeException e) {
-                    logger.error("RuntimeException", e);
-                  } finally {
-                    numIndexBuildTasks.decrementAndGet();
-                    indexLockMap.get(indexType).writeLock().unlock();
-                  }
-                };
-            indexBuildPoolManager.submit(buildTask);
+            //            Runnable buildTask =
+            //                () -> {
+            //                  try {
+            //                    
indexLockMap.get(indexType).writeLock().lock();
+            //                    IndexFeatureExtractor extractor = 
index.startFlushTask(path,
+            // tvList);
+            //                    while (extractor.hasNext()) {
+            //                      extractor.processNext();
+            //                      index.buildNext();
+            //                    }
+            //                    index.endFlushTask();
+            //                  } catch (IndexManagerException e) {
+            //                    // Give up the following data, but the 
previously serialized chunk
+            // will not be
+            //                    // affected.
+            //                    logger.error("build index failed", e);
+            //                  } catch (RuntimeException e) {
+            //                    logger.error("RuntimeException", e);
+            //                  } finally {
+            //                    numIndexBuildTasks.decrementAndGet();
+            //                    
indexLockMap.get(indexType).writeLock().unlock();
+            //                  }
+            //                };
+            try {
+              indexLockMap.get(indexType).writeLock().lock();
+              IndexFeatureExtractor extractor = index.startFlushTask(path, 
tvList);
+              while (extractor.hasNext()) {
+                extractor.processNext();
+                index.buildNext();
+              }
+              index.endFlushTask();
+            } catch (IndexManagerException e) {
+              // Give up the following data, but the previously serialized 
chunk will not be
+              // affected.
+              logger.error("build index failed", e);
+            } catch (RuntimeException e) {
+              logger.error("RuntimeException", e);
+            } finally {
+              numIndexBuildTasks.decrementAndGet();
+              indexLockMap.get(indexType).writeLock().unlock();
+            }
+            //            indexBuildPoolManager.submit(buildTask);
           });
     } finally {
       lock.writeLock().unlock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java 
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
index b9b7e1f..202489d 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
@@ -176,8 +176,8 @@ public abstract class RTreeIndex extends IoTDBIndex {
     logger.info("RTreeIndex {} starts serialization", indexSeries);
     //    logger.info("RTreeIndex RTree to serialized: {}", 
rTree.toString().substring(0, 10));
     logger.info("RTreeIndex RTree to serialized: {}", rTree);
-    logger.info("RTreeIndex RTree to serialized: {}", 
rTree.toDetailedString());
-    logger.info("Serialize InvolvedSet: {}", involvedPathSet.size());
+    //    logger.info("RTreeIndex RTree to serialized: {}", 
rTree.toDetailedString());
+    //    logger.info("Serialize InvolvedSet: {}", involvedPathSet.size());
     try (OutputStream outputStream = new FileOutputStream(featureFile)) {
       // out is outputStream exactly. It seems redundant, but it would be 
really weird if the second
       // parameter "serializeItem" doesn't input an outputStream.
@@ -193,7 +193,7 @@ public abstract class RTreeIndex extends IoTDBIndex {
           });
       System.out.println("rtree file size: " + FileUtils.sizeOf(featureFile));
     } catch (IOException e) {
-      logger.error("Error when serialize router. Given up.", e);
+      logger.error("Error when serialize RTree. Given up.", e);
     }
     logger.info("RTreeIndex {} finishes serialization", indexSeries);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java 
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
index 15c395d..c57d3e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
@@ -186,7 +186,9 @@ public class ELBIndex extends IoTDBIndex {
     logger.info("ELBIndex {} start serialization", indexSeries);
     try (OutputStream outputStream = new FileOutputStream(featureFile)) {
       ReadWriteIOUtils.write(windowBlockFeatures.size(), outputStream);
-      for (ELBWindowBlockFeature features : windowBlockFeatures) {
+      int len = windowBlockFeatures.size();
+      for (int i = 0; i < len; i++) {
+        ELBWindowBlockFeature features = windowBlockFeatures.get(i);
         ReadWriteIOUtils.write(features.startTime, outputStream);
         ReadWriteIOUtils.write(features.endTime, outputStream);
         ReadWriteIOUtils.write(features.feature, outputStream);
@@ -197,7 +199,7 @@ public class ELBIndex extends IoTDBIndex {
           String.format(
               "calc size: %d=4L + block_size(%d) * (8 + 8 + 8)",
               expectSize, windowBlockFeatures.size()));
-      System.out.println("hashtable file size: " + 
FileUtils.sizeOf(featureFile));
+      System.out.println("elb file size: " + FileUtils.sizeOf(featureFile));
     } catch (IOException e) {
       logger.error("Error when serialize router. Given up.", e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java 
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
index 474cfa7..499051c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
@@ -60,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -218,8 +219,9 @@ public class MMHHIndex extends IoTDBIndex {
         List<Long> bucket = entry.getValue();
         ReadWriteIOUtils.write(k, outputStream);
         ReadWriteIOUtils.write(bucket.size(), outputStream);
-        for (Long v : bucket) {
-          ReadWriteIOUtils.write(v, outputStream);
+        int len = bucket.size();
+        for (int i = 0; i < len; i++) {
+          ReadWriteIOUtils.write(bucket.get(i), outputStream);
         }
       }
       System.out.println("hashtable bucket size: " + hashLookupTable.size());
@@ -230,7 +232,10 @@ public class MMHHIndex extends IoTDBIndex {
               expectSize, hashLookupTable.size(), itemSize));
       System.out.println("hashtable file size: " + 
FileUtils.sizeOf(featureFile));
     } catch (IOException e) {
-      logger.error("Error when serialize router. Given up.", e);
+      logger.error("Error when serialize MMHH. Given up.", e);
+    } catch (ConcurrentModificationException e2) {
+      System.out.println("finished");
+      throw e2;
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java 
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
index ae62f45..2b0cfbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.ConcurrentModificationException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -408,32 +409,39 @@ public class RTree<T> {
   private void serialize(
       RNode node, OutputStream outputStream, BiConsumer<T, OutputStream> 
serializeItem)
       throws IOException {
-    if (node instanceof Item) {
-      ReadWriteIOUtils.write(ITEM, outputStream);
-    } else if (node.isLeaf) {
-      ReadWriteIOUtils.write(LEAF_NODE, outputStream);
-    } else {
-      ReadWriteIOUtils.write(INNER_NODE, outputStream);
-    }
-    for (float lb : node.lbs) {
-      ReadWriteIOUtils.write(lb, outputStream);
-    }
-    if (!(node instanceof Item)) {
-      for (float ub : node.ubs) {
-        ReadWriteIOUtils.write(ub, outputStream);
+    try {
+      if (node instanceof Item) {
+        ReadWriteIOUtils.write(ITEM, outputStream);
+      } else if (node.isLeaf) {
+        ReadWriteIOUtils.write(LEAF_NODE, outputStream);
+      } else {
+        ReadWriteIOUtils.write(INNER_NODE, outputStream);
       }
-    }
-    if (node instanceof Item) {
-      T value = ((Item<T>) node).v;
-      //      ReadWriteIOUtils.write(value.toString(), outputStream);
-      // only the series id
-      serializeItem.accept(value, outputStream);
-    } else {
-      // write child
-      ReadWriteIOUtils.write(node.getChildren().size(), outputStream);
-      for (RNode child : node.getChildren()) {
-        serialize(child, outputStream, serializeItem);
+      for (float lb : node.lbs) {
+        ReadWriteIOUtils.write(lb, outputStream);
+      }
+      if (!(node instanceof Item)) {
+        for (float ub : node.ubs) {
+          ReadWriteIOUtils.write(ub, outputStream);
+        }
+      }
+      if (node instanceof Item) {
+        T value = ((Item<T>) node).v;
+        //      ReadWriteIOUtils.write(value.toString(), outputStream);
+        // only the series id
+        serializeItem.accept(value, outputStream);
+      } else {
+        // write child
+        ReadWriteIOUtils.write(node.getChildren().size(), outputStream);
+        List<RNode> children = node.getChildren();
+        int len = children.size();
+        for (int i = 0; i < len; i++) {
+          serialize(children.get(i), outputStream, serializeItem);
+        }
       }
+    } catch (ConcurrentModificationException e) {
+      System.out.println("finished");
+      throw e;
     }
   }
 

Reply via email to