This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2e034 continue writing the last unclosed file (#700)
7a2e034 is described below
commit 7a2e034ffd76dc5f5cb133244966cbad9f5561e1
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Jan 3 19:18:11 2020 +0800
continue writing the last unclosed file (#700)
---
.../engine/storagegroup/StorageGroupProcessor.java | 39 ++++++++++----
.../db/engine/storagegroup/TsFileProcessor.java | 16 ++++++
.../db/engine/storagegroup/TsFileResource.java | 28 ++++++++++
.../writelog/recover/TsFileRecoverPerformer.java | 25 ++++++---
.../db/writelog/recover/SeqTsFileRecoverTest.java | 60 ++++++++++++++++++++--
.../writelog/recover/UnseqTsFileRecoverTest.java | 2 +-
6 files changed, 150 insertions(+), 20 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8102955..92f9542 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -86,6 +86,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -301,24 +302,42 @@ public class StorageGroupProcessor {
}
private void recoverSeqFiles(List<TsFileResource> tsFiles) throws
StorageGroupProcessorException {
- for (TsFileResource tsFileResource : tsFiles) {
+ for (int i = 0; i < tsFiles.size(); i++) {
+ TsFileResource tsFileResource = tsFiles.get(i);
sequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new
TsFileRecoverPerformer(storageGroupName + "-"
- , schema, versionController, tsFileResource, false);
- recoverPerformer.recover();
- tsFileResource.setClosed(true);
+ , schema, versionController, tsFileResource, false, i ==
tsFiles.size() - 1);
+ RestorableTsFileIOWriter writer = recoverPerformer.recover();
+ if (i != tsFiles.size() - 1) {
+ // not the last file, just close it
+ tsFileResource.setClosed(true);
+ } else if (writer.canWrite()) {
+ // the last file is not closed, continue writing to in
+ workSequenceTsFileProcessor = new TsFileProcessor(storageGroupName,
tsFileResource,
+ schema, versionController, this::closeUnsealedTsFileProcessor,
+ this::updateLatestFlushTimeCallback, true, writer);
+ }
}
}
private void recoverUnseqFiles(List<TsFileResource> tsFiles)
throws StorageGroupProcessorException {
- for (TsFileResource tsFileResource : tsFiles) {
+ for (int i = 0; i < tsFiles.size(); i++) {
+ TsFileResource tsFileResource = tsFiles.get(i);
unSequenceFileList.add(tsFileResource);
- TsFileRecoverPerformer recoverPerformer = new
TsFileRecoverPerformer(storageGroupName + "-",
- schema,
- versionController, tsFileResource, true);
- recoverPerformer.recover();
- tsFileResource.setClosed(true);
+ TsFileRecoverPerformer recoverPerformer = new
TsFileRecoverPerformer(storageGroupName + "-"
+ , schema, versionController, tsFileResource, true, i ==
tsFiles.size() - 1);
+ RestorableTsFileIOWriter writer = recoverPerformer.recover();
+ if (i != tsFiles.size() - 1) {
+ // not the last file, just close it
+ tsFileResource.setClosed(true);
+ } else if (writer.canWrite()) {
+ // the last file is not closed, continue writing to in
+ workUnSequenceTsFileProcessor = new TsFileProcessor(storageGroupName,
tsFileResource,
+ schema, versionController, this::closeUnsealedTsFileProcessor,
+ () -> true, false, writer);
+ }
+
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 2680b96..110c34e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -135,6 +135,20 @@ public class TsFileProcessor {
this.tsFileResource.setHistoricalVersions(Collections.singleton(versionController.currVersion()));
}
+ public TsFileProcessor(String storageGroupName, TsFileResource
tsFileResource, Schema schema,
+ VersionController versionController, CloseTsFileCallBack
closeUnsealedTsFileProcessor,
+ Supplier updateLatestFlushTimeCallback, boolean sequence,
RestorableTsFileIOWriter writer) {
+ this.storageGroupName =storageGroupName;
+ this.tsFileResource = tsFileResource;
+ this.schema = schema;
+ this.versionController = versionController;
+ this.writer = writer;
+ this.closeTsFileCallback = closeUnsealedTsFileProcessor;
+ this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
+ this.sequence = sequence;
+ logger.info("reopen a tsfile processor {}", tsFileResource.getFile());
+ }
+
/**
* insert data in an InsertPlan into the workingMemtable.
*
@@ -286,6 +300,7 @@ public class TsFileProcessor {
return;
}
shouldClose = true;
+ tsFileResource.setCloseFlag();
// when a flush thread serves this TsFileProcessor (because the
processor is submitted by
// registerTsFileProcessor()), the thread will seal the corresponding
TsFile and
// execute other cleanup works if (shouldClose == true and
flushingMemTables is empty).
@@ -494,6 +509,7 @@ public class TsFileProcessor {
tsFileResource.serialize();
writer.endFile(schema);
+ tsFileResource.cleanCloseFlag();
// remove this processor from Closing list in StorageGroupProcessor,
// mark the TsFileResource closed, no need writer anymore
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 77dccba..4c02c55 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -43,14 +43,19 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TsFileResource {
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileResource.class);
+
// tsfile
private File file;
public static final String RESOURCE_SUFFIX = ".resource";
static final String TEMP_SUFFIX = ".temp";
+ private static final String CLOSING_SUFFIX = ".closing";
/**
* device -> start time
@@ -348,6 +353,29 @@ public class TsFileResource {
return false;
}
+ /**
+ * set a file flag indicating that the file is being closed, so during
recovery we could know
+ * we should close the file.
+ */
+ public void setCloseFlag() {
+ try {
+ new File(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile();
+ } catch (IOException e) {
+ logger.error("Cannot create close flag for {}", file, e);
+ }
+ }
+
+ /**
+ * clean the close flag when the file is successfully closed.
+ */
+ public void cleanCloseFlag() {
+ new File(file.getAbsoluteFile() + CLOSING_SUFFIX).delete();
+ }
+
+ public boolean isCloseFlagSet() {
+ return new File(file.getAbsoluteFile() + CLOSING_SUFFIX).exists();
+ }
+
public Set<Long> getHistoricalVersions() {
return historicalVersions;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index ec29216..77598ab 100644
---
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -64,23 +64,27 @@ public class TsFileRecoverPerformer {
private LogReplayer logReplayer;
private TsFileResource tsFileResource;
private boolean acceptUnseq;
+ private boolean isLastFile;
public TsFileRecoverPerformer(String logNodePrefix,
Schema schema, VersionController versionController,
- TsFileResource currentTsFileResource, boolean acceptUnseq) {
+ TsFileResource currentTsFileResource, boolean acceptUnseq, boolean
isLastFile) {
this.insertFilePath = currentTsFileResource.getFile().getPath();
this.logNodePrefix = logNodePrefix;
this.schema = schema;
this.versionController = versionController;
this.tsFileResource = currentTsFileResource;
this.acceptUnseq = acceptUnseq;
+ this.isLastFile = isLastFile;
}
/**
* 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file
to remaining corrected
* data 2. redo the WALs to recover unpersisted data 3. flush and close the
file 4. clean WALs
+ * @return a RestorableTsFileIOWriter if the file is not closed before
crush, so this writer
+ * can be used to continue writing
*/
- public void recover() throws StorageGroupProcessorException {
+ public RestorableTsFileIOWriter recover() throws
StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath,
tsFileResource.getModFile(),
@@ -89,7 +93,7 @@ public class TsFileRecoverPerformer {
File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
if (!insertFile.exists()) {
logger.error("TsFile {} is missing, will skip its recovery.",
insertFilePath);
- return;
+ return null;
}
// remove corrupted part of the TsFile
RestorableTsFileIOWriter restorableTsFileIOWriter;
@@ -118,7 +122,7 @@ public class TsFileRecoverPerformer {
tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
tsFileResource.serialize();
}
- return;
+ return null;
} catch (IOException e) {
throw new StorageGroupProcessorException(
"recover the resource file failed: " + insertFilePath
@@ -140,6 +144,8 @@ public class TsFileRecoverPerformer {
} catch (IOException e) {
throw new StorageGroupProcessorException(e);
}
+
+ return restorableTsFileIOWriter;
}
private void recoverResourceFromFile() throws IOException {
@@ -202,8 +208,15 @@ public class TsFileRecoverPerformer {
restorableTsFileIOWriter,
tsFileResource.getFile().getParentFile().getName());
tableFlushTask.syncFlushMemTable();
}
- // close file
- restorableTsFileIOWriter.endFile(schema);
+
+ if (!isLastFile || isLastFile && tsFileResource.isCloseFlagSet()) {
+ // end the file if it is not the last file or it is closed before crush
+ restorableTsFileIOWriter.endFile(schema);
+ tsFileResource.cleanCloseFlag();
+ }
+ // otherwise this file is not closed before crush, do nothing so we can
continue writing
+ // into it
+
tsFileResource.serialize();
} catch (IOException | InterruptedException | ExecutionException e) {
throw new StorageGroupProcessorException(e);
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index cbfd1fa..70645b5 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.writelog.recover;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@@ -50,6 +52,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -136,11 +139,62 @@ public class SeqTsFileRecoverTest {
}
@Test
- public void test() throws StorageGroupProcessorException, IOException {
+ public void testNonLastRecovery() throws StorageGroupProcessorException,
IOException {
TsFileRecoverPerformer performer = new
TsFileRecoverPerformer(logNodePrefix, schema,
- versionController, resource, true);
+ versionController, resource, true, false);
ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
- performer.recover();
+ RestorableTsFileIOWriter writer = performer.recover();
+ assertFalse(writer.canWrite());
+
+ assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
+ assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
+ for (int i = 0; i < 10; i++) {
+ assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+ assertEquals(19, (long) resource.getEndTimeMap().get("device" + i));
+ }
+
+ ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new
TsFileSequenceReader(tsF.getPath()));
+ List<Path> pathList = new ArrayList<>();
+ for (int j = 0; j < 10; j++) {
+ for (int k = 0; k < 10; k++) {
+ pathList.add(new Path("device" + j, "sensor" + k));
+ }
+ }
+ QueryExpression queryExpression = QueryExpression.create(pathList, null);
+ QueryDataSet dataSet = readOnlyTsFile.query(queryExpression);
+ for (int i = 0; i < 20; i++) {
+ RowRecord record = dataSet.next();
+ assertEquals(i, record.getTimestamp());
+ List<Field> fields = record.getFields();
+ assertEquals(100, fields.size());
+ for (int j = 0; j < 100; j++) {
+ assertEquals(j % 10, fields.get(j).getLongV());
+ }
+ }
+
+ pathList = new ArrayList<>();
+ pathList.add(new Path("device99", "sensor1"));
+ pathList.add(new Path("device99", "sensor4"));
+ queryExpression = QueryExpression.create(pathList, null);
+ dataSet = readOnlyTsFile.query(queryExpression);
+ Assert.assertTrue(dataSet.hasNext());
+ RowRecord record = dataSet.next();
+ Assert.assertEquals("2\t0\tnull", record.toString());
+ Assert.assertTrue(dataSet.hasNext());
+ record = dataSet.next();
+ Assert.assertEquals("100\tnull\t0", record.toString());
+
+ readOnlyTsFile.close();
+ }
+
+ @Test
+ public void testLastRecovery() throws StorageGroupProcessorException,
IOException {
+ TsFileRecoverPerformer performer = new
TsFileRecoverPerformer(logNodePrefix, schema,
+ versionController, resource, true, true);
+ ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
+ RestorableTsFileIOWriter writer = performer.recover();
+ assertTrue(writer.canWrite());
+ writer.endFile(schema);
assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
diff --git
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 351228c..e68f4c4 100644
---
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -146,7 +146,7 @@ public class UnseqTsFileRecoverTest {
@Test
public void test() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new
TsFileRecoverPerformer(logNodePrefix, schema,
- versionController, resource, true);
+ versionController, resource, true, false);
ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
performer.recover();