This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch continue_file_after_recovery in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 55ab7868ad5ff65b6d496f8217a8d029899c55bf Author: jt2594838 <[email protected]> AuthorDate: Tue Dec 31 20:55:55 2019 +0800 continue writing the last unclosed file --- .../engine/storagegroup/StorageGroupProcessor.java | 39 ++++++++++---- .../db/engine/storagegroup/TsFileProcessor.java | 17 +++++- .../db/engine/storagegroup/TsFileResource.java | 29 +++++++++++ .../writelog/recover/TsFileRecoverPerformer.java | 25 ++++++--- .../db/writelog/recover/SeqTsFileRecoverTest.java | 60 ++++++++++++++++++++-- .../writelog/recover/UnseqTsFileRecoverTest.java | 2 +- 6 files changed, 151 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 741b902..0089895 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -85,6 +85,7 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -287,24 +288,42 @@ public class StorageGroupProcessor { } private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException { - for (TsFileResource tsFileResource : tsFiles) { + for (int i = 0; i < tsFiles.size(); i++) { + TsFileResource tsFileResource = tsFiles.get(i); sequenceFileList.add(tsFileResource); TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-" - , schema, versionController, tsFileResource, false); - recoverPerformer.recover(); - tsFileResource.setClosed(true); + , schema, versionController, tsFileResource, false, i == tsFiles.size() - 1); + RestorableTsFileIOWriter writer = recoverPerformer.recover(); + if (i != tsFiles.size() - 1) { + // not the last file, just close it + tsFileResource.setClosed(true); + } else if (writer.canWrite()) { + // the last file is not closed, continue writing to in + workSequenceTsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource, + schema, versionController, this::closeUnsealedTsFileProcessor, + this::updateLatestFlushTimeCallback, true, writer); + } } } private void recoverUnseqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException { - for (TsFileResource tsFileResource : tsFiles) { + for (int i = 0; i < tsFiles.size(); i++) { + TsFileResource tsFileResource = tsFiles.get(i); unSequenceFileList.add(tsFileResource); - TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", - schema, - versionController, tsFileResource, true); - recoverPerformer.recover(); - tsFileResource.setClosed(true); + TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-" + , schema, versionController, tsFileResource, true, i == tsFiles.size() - 1); + RestorableTsFileIOWriter writer = recoverPerformer.recover(); + if (i != tsFiles.size() - 1) { + // not the last file, just close it + tsFileResource.setClosed(true); + } else if (writer.canWrite()) { + // the last file is not closed, continue writing to in + workUnSequenceTsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource, + schema, versionController, this::closeUnsealedTsFileProcessor, + () -> true, false, writer); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index eec8828..26698a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -48,7 +48,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.constant.DatetimeUtils; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -133,6 +132,20 @@ public class TsFileProcessor { logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath()); } + public TsFileProcessor(String storageGroupName, TsFileResource tsFileResource, Schema schema, + VersionController versionController, CloseTsFileCallBack closeUnsealedTsFileProcessor, + Supplier updateLatestFlushTimeCallback, boolean sequence, RestorableTsFileIOWriter writer) { + this.storageGroupName =storageGroupName; + this.tsFileResource = tsFileResource; + this.schema = schema; + this.versionController = versionController; + this.writer = writer; + this.closeTsFileCallback = closeUnsealedTsFileProcessor; + this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; + this.sequence = sequence; + logger.info("reopen a tsfile processor {}", tsFileResource.getFile()); + } + /** * insert data in an InsertPlan into the workingMemtable. * @@ -284,6 +297,7 @@ public class TsFileProcessor { return; } shouldClose = true; + tsFileResource.setCloseFlag(); // when a flush thread serves this TsFileProcessor (because the processor is submitted by // registerTsFileProcessor()), the thread will seal the corresponding TsFile and // execute other cleanup works if (shouldClose == true and flushingMemTables is empty). @@ -492,6 +506,7 @@ public class TsFileProcessor { tsFileResource.serialize(); writer.endFile(schema); + tsFileResource.cleanCloseFlag(); // remove this processor from Closing list in StorageGroupProcessor, // mark the TsFileResource closed, no need writer anymore diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 32817cf..656851e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -39,14 +39,19 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TsFileResource { + private static final Logger logger = LoggerFactory.getLogger(TsFileResource.class); + // tsfile private File file; public static final String RESOURCE_SUFFIX = ".resource"; static final String TEMP_SUFFIX = ".temp"; + private static final String CLOSING_SUFFIX = ".closing"; /** * device -> start time @@ -327,4 +332,28 @@ public class TsFileResource { } return false; } + + /** + * set a file flag indicating that the file is being closed, so during recovery we could know + * we should close the file. + */ + public void setCloseFlag() { + try { + new File(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile(); + } catch (IOException e) { + logger.error("Cannot create close flag for {}", file, e); + } + } + + /** + * clean the close flag when the file is successfully closed. + */ + public void cleanCloseFlag() { + new File(file.getAbsoluteFile() + CLOSING_SUFFIX).delete(); + } + + public boolean isCloseFlagSet() { + return new File(file.getAbsoluteFile() + CLOSING_SUFFIX).exists(); + } + } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index 148cf73..2faac3b 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -61,23 +61,27 @@ public class TsFileRecoverPerformer { private LogReplayer logReplayer; private TsFileResource tsFileResource; private boolean acceptUnseq; + private boolean isLastFile; public TsFileRecoverPerformer(String logNodePrefix, Schema schema, VersionController versionController, - TsFileResource currentTsFileResource, boolean acceptUnseq) { + TsFileResource currentTsFileResource, boolean acceptUnseq, boolean isLastFile) { this.insertFilePath = currentTsFileResource.getFile().getPath(); this.logNodePrefix = logNodePrefix; this.schema = schema; this.versionController = versionController; this.tsFileResource = currentTsFileResource; this.acceptUnseq = acceptUnseq; + this.isLastFile = isLastFile; } /** * 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected * data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs + * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer + * can be used to continue writing */ - public void recover() throws StorageGroupProcessorException { + public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException { IMemTable recoverMemTable = new PrimitiveMemTable(); this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(), @@ -86,7 +90,7 @@ public class TsFileRecoverPerformer { File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath); if (!insertFile.exists()) { logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath); - return; + return null; } // remove corrupted part of the TsFile RestorableTsFileIOWriter restorableTsFileIOWriter; @@ -126,7 +130,7 @@ public class TsFileRecoverPerformer { // write .resource file tsFileResource.serialize(); } - return; + return null; } catch (IOException e) { throw new StorageGroupProcessorException( "recover the resource file failed: " + insertFilePath @@ -148,6 +152,8 @@ public class TsFileRecoverPerformer { } catch (IOException e) { throw new StorageGroupProcessorException(e); } + + return restorableTsFileIOWriter; } private void recoverResourceFromFile() throws IOException { @@ -210,8 +216,15 @@ public class TsFileRecoverPerformer { restorableTsFileIOWriter, tsFileResource.getFile().getParentFile().getName()); tableFlushTask.syncFlushMemTable(); } - // close file - restorableTsFileIOWriter.endFile(schema); + + if (!isLastFile || isLastFile && tsFileResource.isCloseFlagSet()) { + // end the file if it is not the last file or it is closed before crush + restorableTsFileIOWriter.endFile(schema); + tsFileResource.cleanCloseFlag(); + } + // otherwise this file is not closed before crush, do nothing so we can continue writing + // into it + tsFileResource.serialize(); } catch (IOException | InterruptedException | ExecutionException e) { throw new StorageGroupProcessorException(e); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java index cbfd1fa..70645b5 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.writelog.recover; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -50,6 +52,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -136,11 +139,62 @@ public class SeqTsFileRecoverTest { } @Test - public void test() throws StorageGroupProcessorException, IOException { + public void testNonLastRecovery() throws StorageGroupProcessorException, IOException { TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema, - versionController, resource, true); + versionController, resource, true, false); ActiveTimeSeriesCounter.getInstance().init(logNodePrefix); - performer.recover(); + RestorableTsFileIOWriter writer = performer.recover(); + assertFalse(writer.canWrite()); + + assertEquals(2, (long) resource.getStartTimeMap().get("device99")); + assertEquals(100, (long) resource.getEndTimeMap().get("device99")); + for (int i = 0; i < 10; i++) { + assertEquals(0, (long) resource.getStartTimeMap().get("device" + i)); + assertEquals(19, (long) resource.getEndTimeMap().get("device" + i)); + } + + ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath())); + List<Path> pathList = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + for (int k = 0; k < 10; k++) { + pathList.add(new Path("device" + j, "sensor" + k)); + } + } + QueryExpression queryExpression = QueryExpression.create(pathList, null); + QueryDataSet dataSet = readOnlyTsFile.query(queryExpression); + for (int i = 0; i < 20; i++) { + RowRecord record = dataSet.next(); + assertEquals(i, record.getTimestamp()); + List<Field> fields = record.getFields(); + assertEquals(100, fields.size()); + for (int j = 0; j < 100; j++) { + assertEquals(j % 10, fields.get(j).getLongV()); + } + } + + pathList = new ArrayList<>(); + pathList.add(new Path("device99", "sensor1")); + pathList.add(new Path("device99", "sensor4")); + queryExpression = QueryExpression.create(pathList, null); + dataSet = readOnlyTsFile.query(queryExpression); + Assert.assertTrue(dataSet.hasNext()); + RowRecord record = dataSet.next(); + Assert.assertEquals("2\t0\tnull", record.toString()); + Assert.assertTrue(dataSet.hasNext()); + record = dataSet.next(); + Assert.assertEquals("100\tnull\t0", record.toString()); + + readOnlyTsFile.close(); + } + + @Test + public void testLastRecovery() throws StorageGroupProcessorException, IOException { + TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema, + versionController, resource, true, true); + ActiveTimeSeriesCounter.getInstance().init(logNodePrefix); + RestorableTsFileIOWriter writer = performer.recover(); + assertTrue(writer.canWrite()); + writer.endFile(schema); assertEquals(2, (long) resource.getStartTimeMap().get("device99")); assertEquals(100, (long) resource.getEndTimeMap().get("device99")); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java index 351228c..e68f4c4 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java @@ -146,7 +146,7 @@ public class UnseqTsFileRecoverTest { @Test public void test() throws StorageGroupProcessorException, IOException { TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema, - versionController, resource, true); + versionController, resource, true, false); ActiveTimeSeriesCounter.getInstance().init(logNodePrefix); performer.recover();
