This is an automated email from the ASF dual-hosted git repository.
kangrong pushed a commit to branch f_index_dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/f_index_dev by this push:
new c0744f1 fix concurrent bug temp
c0744f1 is described below
commit c0744f13f7fffacf6349d007b8a4167ef946d385
Author: kr11 <3095717866.com>
AuthorDate: Sun May 16 16:05:59 2021 +0800
fix concurrent bug temp
---
.../resources/conf/iotdb-engine.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 3 +-
.../org/apache/iotdb/db/index/IndexManager.java | 3 +
.../org/apache/iotdb/db/index/IndexProcessor.java | 64 ++++++++++++++--------
.../iotdb/db/index/algorithm/RTreeIndex.java | 6 +-
.../iotdb/db/index/algorithm/elb/ELBIndex.java | 6 +-
.../iotdb/db/index/algorithm/mmhh/MMHHIndex.java | 11 +++-
.../iotdb/db/index/algorithm/rtree/RTree.java | 56 +++++++++++--------
9 files changed, 96 insertions(+), 57 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index a089406..3b9bf5a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -661,7 +661,7 @@ default_index_window_range=10
index_buffer_size=134217728
# the max returned size of index query result
-max_index_query_result_size=5
+max_index_query_result_size=100
# the default value of max size of the index unusable segments, used in
SubMatchIndexUsability
default_max_size_of_unusable_segments=20
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 07b56e3..af670f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -644,7 +644,7 @@ public class IoTDBConfig {
/** the number of virtual storage groups per user-defined storage group */
private int virtualStorageGroupNum = 1;
- private int maxIndexQueryResultSize = 20;
+ private int maxIndexQueryResultSize = 100;
public IoTDBConfig() {
// empty constructor
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 21b0e3c..5efbd70 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -162,7 +162,8 @@ public class MemTableFlushTask {
try {
indexFlushTask.endFlush();
} catch (Exception e) {
- LOGGER.error("meet Exception in endFlush, not affect the memtable
flushing", e);
+ LOGGER.info("meet Exception in endFlush, not affect the memtable
flushing");
+ // System.out.println("endFlush has finished");
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
index 03f2e02..0216565 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexManager.java
@@ -204,6 +204,7 @@ public class IndexManager implements IndexManagerMBean,
IService {
IndexStatManager.totalQueryCost = current;
IndexStatManager.latestTimestamp = current + timeout * 1000_000;
logger.info("index query time out has been set to {} sec", timeout);
+ System.out.println("index query time out has been set to sec:" + timeout);
if (paths.size() != 1) {
throw new QueryIndexException("Index allows to query only one path");
}
@@ -277,6 +278,7 @@ public class IndexManager implements IndexManagerMBean,
IService {
return;
}
logger.info("IndexManager starts...");
+ System.out.println("IndexManager starts...");
IndexBuildTaskPoolManager.getInstance().start();
try {
JMXService.registerMBean(this, ServiceType.INDEX_SERVICE.getJmxName());
@@ -287,6 +289,7 @@ public class IndexManager implements IndexManagerMBean,
IService {
throw new StartupException(e);
}
logger.info("IndexManager starts successfully");
+ System.out.println("IndexManager starts successfully");
}
public Map<PartialPath, Map<IndexType, IndexInfo>> getIndexInfos(PartialPath
prefixPath) {
diff --git a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
index 9795442..4548674 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/IndexProcessor.java
@@ -398,28 +398,48 @@ public class IndexProcessor implements
Comparable<IndexProcessor> {
numIndexBuildTasks.decrementAndGet();
return;
}
- Runnable buildTask =
- () -> {
- try {
- indexLockMap.get(indexType).writeLock().lock();
- IndexFeatureExtractor extractor =
index.startFlushTask(path, tvList);
- while (extractor.hasNext()) {
- extractor.processNext();
- index.buildNext();
- }
- index.endFlushTask();
- } catch (IndexManagerException e) {
- // Give up the following data, but the previously
serialized chunk will not be
- // affected.
- logger.error("build index failed", e);
- } catch (RuntimeException e) {
- logger.error("RuntimeException", e);
- } finally {
- numIndexBuildTasks.decrementAndGet();
- indexLockMap.get(indexType).writeLock().unlock();
- }
- };
- indexBuildPoolManager.submit(buildTask);
+ // Runnable buildTask =
+ // () -> {
+ // try {
+ //
indexLockMap.get(indexType).writeLock().lock();
+ // IndexFeatureExtractor extractor =
index.startFlushTask(path,
+ // tvList);
+ // while (extractor.hasNext()) {
+ // extractor.processNext();
+ // index.buildNext();
+ // }
+ // index.endFlushTask();
+ // } catch (IndexManagerException e) {
+ // // Give up the following data, but the
previously serialized chunk
+ // will not be
+ // // affected.
+ // logger.error("build index failed", e);
+ // } catch (RuntimeException e) {
+ // logger.error("RuntimeException", e);
+ // } finally {
+ // numIndexBuildTasks.decrementAndGet();
+ //
indexLockMap.get(indexType).writeLock().unlock();
+ // }
+ // };
+ try {
+ indexLockMap.get(indexType).writeLock().lock();
+ IndexFeatureExtractor extractor = index.startFlushTask(path,
tvList);
+ while (extractor.hasNext()) {
+ extractor.processNext();
+ index.buildNext();
+ }
+ index.endFlushTask();
+ } catch (IndexManagerException e) {
+ // Give up the following data, but the previously serialized
chunk will not be
+ // affected.
+ logger.error("build index failed", e);
+ } catch (RuntimeException e) {
+ logger.error("RuntimeException", e);
+ } finally {
+ numIndexBuildTasks.decrementAndGet();
+ indexLockMap.get(indexType).writeLock().unlock();
+ }
+ // indexBuildPoolManager.submit(buildTask);
});
} finally {
lock.writeLock().unlock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
index b9b7e1f..202489d 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
@@ -176,8 +176,8 @@ public abstract class RTreeIndex extends IoTDBIndex {
logger.info("RTreeIndex {} starts serialization", indexSeries);
// logger.info("RTreeIndex RTree to serialized: {}",
rTree.toString().substring(0, 10));
logger.info("RTreeIndex RTree to serialized: {}", rTree);
- logger.info("RTreeIndex RTree to serialized: {}",
rTree.toDetailedString());
- logger.info("Serialize InvolvedSet: {}", involvedPathSet.size());
+ // logger.info("RTreeIndex RTree to serialized: {}",
rTree.toDetailedString());
+ // logger.info("Serialize InvolvedSet: {}", involvedPathSet.size());
try (OutputStream outputStream = new FileOutputStream(featureFile)) {
// out is outputStream exactly. It seems redundant, but it would be
really weird if the second
// parameter "serializeItem" doesn't input an outputStream.
@@ -193,7 +193,7 @@ public abstract class RTreeIndex extends IoTDBIndex {
});
System.out.println("rtree file size: " + FileUtils.sizeOf(featureFile));
} catch (IOException e) {
- logger.error("Error when serialize router. Given up.", e);
+ logger.error("Error when serialize RTree. Given up.", e);
}
logger.info("RTreeIndex {} finishes serialization", indexSeries);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
index 15c395d..c57d3e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/elb/ELBIndex.java
@@ -186,7 +186,9 @@ public class ELBIndex extends IoTDBIndex {
logger.info("ELBIndex {} start serialization", indexSeries);
try (OutputStream outputStream = new FileOutputStream(featureFile)) {
ReadWriteIOUtils.write(windowBlockFeatures.size(), outputStream);
- for (ELBWindowBlockFeature features : windowBlockFeatures) {
+ int len = windowBlockFeatures.size();
+ for (int i = 0; i < len; i++) {
+ ELBWindowBlockFeature features = windowBlockFeatures.get(i);
ReadWriteIOUtils.write(features.startTime, outputStream);
ReadWriteIOUtils.write(features.endTime, outputStream);
ReadWriteIOUtils.write(features.feature, outputStream);
@@ -197,7 +199,7 @@ public class ELBIndex extends IoTDBIndex {
String.format(
"calc size: %d=4L + block_size(%d) * (8 + 8 + 8)",
expectSize, windowBlockFeatures.size()));
- System.out.println("hashtable file size: " +
FileUtils.sizeOf(featureFile));
+ System.out.println("elb file size: " + FileUtils.sizeOf(featureFile));
} catch (IOException e) {
logger.error("Error when serialize router. Given up.", e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
index 474cfa7..499051c 100644
---
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
@@ -60,6 +60,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -218,8 +219,9 @@ public class MMHHIndex extends IoTDBIndex {
List<Long> bucket = entry.getValue();
ReadWriteIOUtils.write(k, outputStream);
ReadWriteIOUtils.write(bucket.size(), outputStream);
- for (Long v : bucket) {
- ReadWriteIOUtils.write(v, outputStream);
+ int len = bucket.size();
+ for (int i = 0; i < len; i++) {
+ ReadWriteIOUtils.write(bucket.get(i), outputStream);
}
}
System.out.println("hashtable bucket size: " + hashLookupTable.size());
@@ -230,7 +232,10 @@ public class MMHHIndex extends IoTDBIndex {
expectSize, hashLookupTable.size(), itemSize));
System.out.println("hashtable file size: " +
FileUtils.sizeOf(featureFile));
} catch (IOException e) {
- logger.error("Error when serialize router. Given up.", e);
+ logger.error("Error when serialize MMHH. Given up.", e);
+ } catch (ConcurrentModificationException e2) {
+ System.out.println("finished");
+ throw e2;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
index ae62f45..2b0cfbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/rtree/RTree.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
@@ -408,32 +409,39 @@ public class RTree<T> {
private void serialize(
RNode node, OutputStream outputStream, BiConsumer<T, OutputStream>
serializeItem)
throws IOException {
- if (node instanceof Item) {
- ReadWriteIOUtils.write(ITEM, outputStream);
- } else if (node.isLeaf) {
- ReadWriteIOUtils.write(LEAF_NODE, outputStream);
- } else {
- ReadWriteIOUtils.write(INNER_NODE, outputStream);
- }
- for (float lb : node.lbs) {
- ReadWriteIOUtils.write(lb, outputStream);
- }
- if (!(node instanceof Item)) {
- for (float ub : node.ubs) {
- ReadWriteIOUtils.write(ub, outputStream);
+ try {
+ if (node instanceof Item) {
+ ReadWriteIOUtils.write(ITEM, outputStream);
+ } else if (node.isLeaf) {
+ ReadWriteIOUtils.write(LEAF_NODE, outputStream);
+ } else {
+ ReadWriteIOUtils.write(INNER_NODE, outputStream);
}
- }
- if (node instanceof Item) {
- T value = ((Item<T>) node).v;
- // ReadWriteIOUtils.write(value.toString(), outputStream);
- // only the series id
- serializeItem.accept(value, outputStream);
- } else {
- // write child
- ReadWriteIOUtils.write(node.getChildren().size(), outputStream);
- for (RNode child : node.getChildren()) {
- serialize(child, outputStream, serializeItem);
+ for (float lb : node.lbs) {
+ ReadWriteIOUtils.write(lb, outputStream);
+ }
+ if (!(node instanceof Item)) {
+ for (float ub : node.ubs) {
+ ReadWriteIOUtils.write(ub, outputStream);
+ }
+ }
+ if (node instanceof Item) {
+ T value = ((Item<T>) node).v;
+ // ReadWriteIOUtils.write(value.toString(), outputStream);
+ // only the series id
+ serializeItem.accept(value, outputStream);
+ } else {
+ // write child
+ ReadWriteIOUtils.write(node.getChildren().size(), outputStream);
+ List<RNode> children = node.getChildren();
+ int len = children.size();
+ for (int i = 0; i < len; i++) {
+ serialize(children.get(i), outputStream, serializeItem);
+ }
}
+ } catch (ConcurrentModificationException e) {
+ System.out.println("finished");
+ throw e;
}
}