This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new abc189c fix channel close bug in merge process (#103)
abc189c is described below
commit abc189cd3389e839485f548cc2aa32d43c21c8e0
Author: Beyyes <[email protected]>
AuthorDate: Wed Mar 27 21:45:34 2019 +0800
fix channel close bug in merge process (#103)
* fix channel close bug, add file reader remove function invoking in merge
process and make SysTimeVersionControllerTest more stable
---
.../db/engine/filenode/FileNodeProcessor.java | 36 ++++--
.../db/engine/overflow/io/OverflowProcessor.java | 2 +
.../db/engine/overflow/io/OverflowResource.java | 1 +
.../iotdb/db/query/control/FileReaderManager.java | 2 +-
.../db/query/control/OpenedFilePathsManager.java | 4 +-
.../db/query/factory/SeriesReaderFactory.java | 1 +
.../version/SysTimeVersionControllerTest.java | 14 ++-
.../db/integration/IoTDBFlushQueryMergeTest.java | 130 +++++++++++++++++++++
8 files changed, 176 insertions(+), 14 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 419648d..72348b6 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -59,6 +59,7 @@ import
org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.engine.pool.MergeManager;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -76,6 +77,7 @@ import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.sync.conf.Constans;
@@ -748,7 +750,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
* query data.
*/
public <T extends Comparable<T>> QueryDataSource query(String deviceId,
String measurementId,
- QueryContext context) throws FileNodeProcessorException {
+ QueryContext context) throws FileNodeProcessorException {
// query overflow data
MeasurementSchema mSchema;
TSDataType dataType;
@@ -1352,6 +1354,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// overflow switch from merge to work
overflowProcessor.switchMergeToWork();
+
// write status to file
isMerging = FileNodeProcessorStatus.NONE;
synchronized (fileNodeProcessorStore) {
@@ -1406,15 +1409,19 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
- private void deleteBufferWriteFiles(List<File> bufferwriteDirList,
Set<String> bufferFiles) {
+ private void deleteBufferWriteFiles(List<File> bufferwriteDirList,
Set<String> bufferFiles)
+ throws IOException {
for (File bufferwriteDir : bufferwriteDirList) {
File[] files = bufferwriteDir.listFiles();
if (files == null) {
continue;
}
for (File file : files) {
- if (!bufferFiles.contains(file.getPath()) && !file.delete()) {
- LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
+ if (!bufferFiles.contains(file.getPath())) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPath());
+ if (!file.delete()) {
+ LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
+ }
}
}
}
@@ -1442,6 +1449,8 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
mergeDeleteLock.lock();
QueryContext context = new QueryContext();
try {
+
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
+ true);
for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
// query one deviceId
List<Path> pathList = new ArrayList<>();
@@ -1471,11 +1480,17 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
.and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new
SingleSeriesExpression(path, timeFilter);
+
+ for (OverflowInsertFile overflowInsertFile :
overflowSeriesDataSource.getOverflowInsertFileList()) {
+
FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
+ false);
+ }
+
IReader seriesReader = SeriesReaderFactory.getInstance()
.createSeriesReaderForMerge(backupIntervalFile,
overflowSeriesDataSource, seriesFilter, context);
numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter,
dataType,
- startTimeMap, endTimeMap);
+ startTimeMap, endTimeMap, overflowSeriesDataSource);
}
if (mergeIsChunkGroupHasData) {
// end the new rowGroupMetadata
@@ -1485,6 +1500,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
} finally {
+
FileReaderManager.getInstance().decreaseFileReaderReference(backupIntervalFile.getFilePath(),
+ true);
+
if (mergeDeleteLock.isLocked()) {
mergeDeleteLock.unlock();
}
@@ -1505,7 +1523,8 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
private int queryAndWriteSeries(IReader seriesReader, Path path,
SingleSeriesExpression seriesFilter, TSDataType dataType,
- Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
+ Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
+ OverflowSeriesDataSource overflowSeriesDataSource)
throws IOException {
int numOfChunk = 0;
try {
@@ -1551,7 +1570,10 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
seriesWriterImpl.writeToFileWriter(mergeFileWriter);
}
} finally {
- seriesReader.close();
+ for (OverflowInsertFile overflowInsertFile :
overflowSeriesDataSource.getOverflowInsertFileList()) {
+
FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+ false);
+ }
}
return numOfChunk;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index b4669ec..0debfe4 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -409,6 +410,7 @@ public class OverflowProcessor extends Processor {
public void switchMergeToWork() throws IOException {
if (mergeResource != null) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(mergeResource.getInsertFilePath());
mergeResource.close();
mergeResource.deleteResource();
mergeResource = null;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 67916b2..fbaf317 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 90e1932..461ff0b 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -91,8 +91,8 @@ public class FileReaderManager implements IService {
executorService.scheduleAtFixedRate(() -> {
synchronized (this) {
- clearMap(unclosedFileReaderMap, unclosedReferenceMap);
clearMap(closedFileReaderMap, closedReferenceMap);
+ clearMap(unclosedFileReaderMap, unclosedReferenceMap);
}
}, 0, examinePeriod, TimeUnit.MILLISECONDS);
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 585d85d..bc6ee38 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -94,11 +94,11 @@ public class OpenedFilePathsManager {
jobIdContainer.remove();
for (String filePath : closedFilePathsMap.get(jobId)) {
- FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
false);
+ FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
true);
}
closedFilePathsMap.remove(jobId);
for (String filePath : unclosedFilePathsMap.get(jobId)) {
- FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
true);
+ FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
false);
}
unclosedFilePathsMap.remove(jobId);
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 0e14b4b..42e81f2 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -160,6 +160,7 @@ public class SeriesReaderFactory {
SingleSeriesExpression singleSeriesExpression,
QueryContext context)
throws IOException {
+
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(fileNode.getFilePath(), true);
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
index 1912d8a..f494da7 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
@@ -29,9 +29,15 @@ public class SysTimeVersionControllerTest {
public void test() {
VersionController versionController = SysTimeVersionController.INSTANCE;
long diff = versionController.currVersion() - System.currentTimeMillis();
- // TODO these comparisons can fail in very rare conditions, how to fix?
- assertTrue(diff >= -2 && diff <= 2);
- diff = versionController.nextVersion() - System.currentTimeMillis();
- assertTrue(diff >= -2 && diff <= 2);
+ // to aovid the test failure on a poor machine, we bear 200ms difference
here.
+ assertTrue(diff >= -200 && diff <= 200);
+ diff = versionController.nextVersion();
+ try {
+ Thread.sleep(200);
+ diff -= System.currentTimeMillis();
+ assertTrue(diff >= -1000 && diff <= -200);
+ } catch (InterruptedException e) {
+ //do nothing
+ }
}
}
\ No newline at end of file
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
new file mode 100644
index 0000000..39691bc
--- /dev/null
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.sql.*;
+import static org.junit.Assert.fail;
+
+public class IoTDBFlushQueryMergeTest {
+
+ private static IoTDB daemon;
+ private static String[] sqls = new String[]{
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+
+ "insert into root.vehicle.d0(timestamp,s0) values(1,101)",
+ "insert into root.vehicle.d0(timestamp,s0) values(2,198)",
+ "insert into root.vehicle.d0(timestamp,s0) values(100,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(101,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(102,80)",
+ "insert into root.vehicle.d0(timestamp,s0) values(103,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(104,90)",
+ "insert into root.vehicle.d0(timestamp,s0) values(105,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(106,99)",
+ "flush",
+ "insert into root.vehicle.d0(timestamp,s0) values(2,10000)",
+ "insert into root.vehicle.d0(timestamp,s0) values(50,10000)",
+ "insert into root.vehicle.d0(timestamp,s0) values(1000,22222)",
+
+ };
+
+ private static IoTDBConfig iotDBConfig =
IoTDBDescriptor.getInstance().getConfig();
+ private static long overflowFileSizeThreshold;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+ overflowFileSizeThreshold = iotDBConfig.getOverflowFileSizeThreshold();
+ iotDBConfig.setOverflowFileSizeThreshold(0);
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ daemon.stop();
+ iotDBConfig.setOverflowFileSizeThreshold(overflowFileSizeThreshold);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ private static void insertData() throws ClassNotFoundException, SQLException
{
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
+ Statement statement = connection.createStatement();
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select * from root");
+ Assert.assertTrue(hasResultSet);
+
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ statement.close();
+
+ statement = connection.createStatement();
+ statement.execute("merge");
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}