This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c161120 bug fix: version files of different storage groups are placed
into the same place (#138)
c161120 is described below
commit c1611203175719b09ea847c7fb34df56a01c9529
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Apr 9 21:22:43 2019 -0500
bug fix: version files of different storage groups are placed into the same
place (#138)
* bug fix: version files of different storage groups are placed into the
same place.
---
.../db/engine/filenode/FileNodeProcessor.java | 20 +++---
.../version/SimpleFileVersionController.java | 24 +++++--
.../version/SimpleFileVersionControllerTest.java | 15 ++--
.../iotdb/db/integration/IoTDBVersionIT.java | 84 ++++++++++++++++++++++
4 files changed, 118 insertions(+), 25 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 72348b6..6818a79 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -134,7 +134,6 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
private FileNodeProcessorStore fileNodeProcessorStore;
private String fileNodeRestoreFilePath;
private final Object fileNodeRestoreLock = new Object();
- private String baseDirPath;
// last merge time
private long lastMergeTime = -1;
private BufferWriteProcessor bufferWriteProcessor = null;
@@ -242,16 +241,16 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
&& dirPath.charAt(dirPath.length() - 1) != File.separatorChar) {
dirPath = dirPath + File.separatorChar;
}
- this.baseDirPath = dirPath + processorName;
- File dataDir = new File(this.baseDirPath);
- if (!dataDir.exists()) {
- dataDir.mkdirs();
+
+ File restoreFolder = new File(dirPath + processorName);
+ if (!restoreFolder.exists()) {
+ restoreFolder.mkdirs();
LOGGER.info(
- "The data directory of the filenode processor {} doesn't exist.
Create new " +
+ "The restore directory of the filenode processor {} doesn't exist.
Create new " +
"directory {}",
- getProcessorName(), baseDirPath);
+ getProcessorName(), restoreFolder.getAbsolutePath());
}
- fileNodeRestoreFilePath = new File(dataDir, processorName +
RESTORE_FILE_SUFFIX).getPath();
+ fileNodeRestoreFilePath = new File(restoreFolder, processorName +
RESTORE_FILE_SUFFIX).getPath();
try {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
@@ -294,7 +293,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
statMonitor.registerStatistics(statStorageDeltaName, this);
}
try {
- versionController = new SimpleFileVersionController(fileNodeDirPath);
+ versionController = new
SimpleFileVersionController(restoreFolder.getPath());
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
@@ -1982,7 +1981,6 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
isMerging == that.isMerging &&
Objects.equals(fileNodeProcessorStore, that.fileNodeProcessorStore) &&
Objects.equals(fileNodeRestoreFilePath, that.fileNodeRestoreFilePath)
&&
- Objects.equals(baseDirPath, that.baseDirPath) &&
Objects.equals(bufferWriteProcessor, that.bufferWriteProcessor) &&
Objects.equals(overflowProcessor, that.overflowProcessor) &&
Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
@@ -2002,7 +2000,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
return Objects.hash(super.hashCode(), statStorageDeltaName,
statParamsHashMap, isOverflowed,
lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles,
emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging,
- numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
baseDirPath,
+ numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
lastMergeTime, bufferWriteProcessor, overflowProcessor,
oldMultiPassTokenSet,
newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock,
shouldRecovery, parameters,
fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 07d5e7d..d9bc1f6 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -22,8 +22,6 @@ package org.apache.iotdb.db.engine.version;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +31,13 @@ import org.slf4j.LoggerFactory;
*/
public class SimpleFileVersionController implements VersionController {
private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleFileVersionController.class);
+
/**
- * Every time currVersion - prevVersion >= SAVE_INTERVAL, currVersion is
persisted and prevVersion
+ * Every time currVersion - prevVersion >= saveInterval, currVersion is
persisted and prevVersion
* is set to currVersion. When recovering from file, the version number is
automatically increased
- * by SAVE_INTERVAL to avoid conflicts.
+ * by saveInterval to avoid conflicts.
*/
- public static final long SAVE_INTERVAL = 100;
+ private static long saveInterval = 100;
private static final String FILE_PREFIX = "Version-";
private long prevVersion;
private long currVersion;
@@ -70,7 +69,7 @@ public class SimpleFileVersionController implements
VersionController {
}
private void checkPersist() throws IOException {
- if ((currVersion - prevVersion) >= SAVE_INTERVAL) {
+ if ((currVersion - prevVersion) >= saveInterval) {
persist();
}
}
@@ -79,6 +78,8 @@ public class SimpleFileVersionController implements
VersionController {
File oldFile = new File(directoryPath, FILE_PREFIX + prevVersion);
File newFile = new File(directoryPath, FILE_PREFIX + currVersion);
FileUtils.moveFile(oldFile, newFile);
+ LOGGER.info("Version file updated, previous: {}, current: {}",
+ oldFile.getAbsolutePath(), newFile.getAbsolutePath());
prevVersion = currVersion;
}
@@ -109,7 +110,16 @@ public class SimpleFileVersionController implements
VersionController {
new FileOutputStream(versionFile).close();
}
// prevent overlapping in case of failure
- currVersion = prevVersion + SAVE_INTERVAL;
+ currVersion = prevVersion + saveInterval;
persist();
}
+
+ // test only method
+ public static void setSaveInterval(long saveInterval) {
+ SimpleFileVersionController.saveInterval = saveInterval;
+ }
+
+ public static long getSaveInterval() {
+ return saveInterval;
+ }
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
index 0bc062d..cacde79 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
@@ -19,16 +19,15 @@
package org.apache.iotdb.db.engine.version;
+
+import static org.junit.Assert.assertEquals;
+
import java.io.File;
import java.io.IOException;
-
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
-import static
org.apache.iotdb.db.engine.version.SimpleFileVersionController.SAVE_INTERVAL;
-import static org.junit.Assert.assertEquals;
-
public class SimpleFileVersionControllerTest {
@Test
public void test() throws IOException {
@@ -39,13 +38,15 @@ public class SimpleFileVersionControllerTest {
Assert.fail("can not create version.tmp folder");
}
VersionController versionController = new
SimpleFileVersionController(tempFilePath);
- assertEquals(SAVE_INTERVAL, versionController.currVersion());
+ assertEquals(SimpleFileVersionController.getSaveInterval(),
versionController.currVersion());
for (int i = 0; i < 150; i++) {
versionController.nextVersion();
}
- assertEquals(SAVE_INTERVAL + 150, versionController.currVersion());
+ assertEquals(SimpleFileVersionController.getSaveInterval() + 150,
+ versionController.currVersion());
versionController = new SimpleFileVersionController(tempFilePath);
- assertEquals(SAVE_INTERVAL + 200, versionController.currVersion());
+ assertEquals(SimpleFileVersionController.getSaveInterval() + 200,
+ versionController.currVersion());
} finally {
FileUtils.deleteDirectory(new File(tempFilePath));
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBVersionIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBVersionIT.java
new file mode 100644
index 0000000..ba42cd0
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBVersionIT.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBVersionIT {
+
+ private IoTDB deamon;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeMemControl();
+ deamon = IoTDB.getInstance();
+ deamon.active();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deamon.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testVersionPersist() throws SQLException, ClassNotFoundException
{
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
+ "root", "root")){
+ Statement statement = connection.createStatement();
+
+ statement.execute("SET STORAGE GROUP TO root.versionTest1");
+ statement.execute("SET STORAGE GROUP TO root.versionTest2");
+ statement.execute("CREATE TIMESERIES root.versionTest1.s0"
+ + " WITH DATATYPE=INT32,ENCODING=PLAIN");
+ statement.execute("CREATE TIMESERIES root.versionTest2.s0"
+ + " WITH DATATYPE=INT32,ENCODING=PLAIN");
+
+ // write and flush enough times to make the version file persist
+ for (int i = 0; i < 3 * SimpleFileVersionController.getSaveInterval(); i
++) {
+ for (int j = 1; j <= 100; j ++) {
+ statement.execute(String
+ .format("INSERT INTO root.versionTest1(timestamp, s0) VALUES
(%d, %d)", i*100+j, j));
+ }
+ statement.execute("FLUSH");
+ for (int j = 1; j <= 100; j ++) {
+ statement.execute(String
+ .format("INSERT INTO root.versionTest2(timestamp, s0) VALUES
(%d, %d)", i*100+j, j));
+ }
+ statement.execute("FLUSH");
+ statement.execute("MERGE");
+ }
+
+ statement.close();
+ }
+ }
+}