This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch continue_file_after_recovery
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 55ab7868ad5ff65b6d496f8217a8d029899c55bf
Author: jt2594838 <[email protected]>
AuthorDate: Tue Dec 31 20:55:55 2019 +0800

    continue writing the last unclosed file
---
 .../engine/storagegroup/StorageGroupProcessor.java | 39 ++++++++++----
 .../db/engine/storagegroup/TsFileProcessor.java    | 17 +++++-
 .../db/engine/storagegroup/TsFileResource.java     | 29 +++++++++++
 .../writelog/recover/TsFileRecoverPerformer.java   | 25 ++++++---
 .../db/writelog/recover/SeqTsFileRecoverTest.java  | 60 ++++++++++++++++++++--
 .../writelog/recover/UnseqTsFileRecoverTest.java   |  2 +-
 6 files changed, 151 insertions(+), 21 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 741b902..0089895 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -85,6 +85,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -287,24 +288,42 @@ public class StorageGroupProcessor {
   }
 
   private void recoverSeqFiles(List<TsFileResource> tsFiles) throws 
StorageGroupProcessorException {
-    for (TsFileResource tsFileResource : tsFiles) {
+    for (int i = 0; i < tsFiles.size(); i++) {
+      TsFileResource tsFileResource = tsFiles.get(i);
       sequenceFileList.add(tsFileResource);
       TsFileRecoverPerformer recoverPerformer = new 
TsFileRecoverPerformer(storageGroupName + "-"
-          , schema, versionController, tsFileResource, false);
-      recoverPerformer.recover();
-      tsFileResource.setClosed(true);
+          , schema, versionController, tsFileResource, false, i == 
tsFiles.size() - 1);
+      RestorableTsFileIOWriter writer = recoverPerformer.recover();
+      if (i != tsFiles.size() - 1) {
+        // not the last file, just close it
+        tsFileResource.setClosed(true);
+      } else if (writer.canWrite()) {
+        // the last file is not closed, continue writing to in
+        workSequenceTsFileProcessor = new TsFileProcessor(storageGroupName, 
tsFileResource,
+            schema, versionController, this::closeUnsealedTsFileProcessor,
+            this::updateLatestFlushTimeCallback, true, writer);
+      }
     }
   }
 
   private void recoverUnseqFiles(List<TsFileResource> tsFiles)
       throws StorageGroupProcessorException {
-    for (TsFileResource tsFileResource : tsFiles) {
+    for (int i = 0; i < tsFiles.size(); i++) {
+      TsFileResource tsFileResource = tsFiles.get(i);
       unSequenceFileList.add(tsFileResource);
-      TsFileRecoverPerformer recoverPerformer = new 
TsFileRecoverPerformer(storageGroupName + "-",
-          schema,
-          versionController, tsFileResource, true);
-      recoverPerformer.recover();
-      tsFileResource.setClosed(true);
+      TsFileRecoverPerformer recoverPerformer = new 
TsFileRecoverPerformer(storageGroupName + "-"
+          , schema, versionController, tsFileResource, true, i == 
tsFiles.size() - 1);
+      RestorableTsFileIOWriter writer = recoverPerformer.recover();
+      if (i != tsFiles.size() - 1) {
+        // not the last file, just close it
+        tsFileResource.setClosed(true);
+      } else if (writer.canWrite()) {
+        // the last file is not closed, continue writing to in
+        workUnSequenceTsFileProcessor = new TsFileProcessor(storageGroupName, 
tsFileResource,
+            schema, versionController, this::closeUnsealedTsFileProcessor,
+            () -> true, false, writer);
+      }
+
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index eec8828..26698a7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,7 +48,6 @@ import 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -133,6 +132,20 @@ public class TsFileProcessor {
     logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
   }
 
+  public TsFileProcessor(String storageGroupName, TsFileResource 
tsFileResource, Schema schema,
+      VersionController versionController, CloseTsFileCallBack 
closeUnsealedTsFileProcessor,
+      Supplier updateLatestFlushTimeCallback, boolean sequence, 
RestorableTsFileIOWriter writer) {
+    this.storageGroupName =storageGroupName;
+    this.tsFileResource = tsFileResource;
+    this.schema = schema;
+    this.versionController = versionController;
+    this.writer = writer;
+    this.closeTsFileCallback = closeUnsealedTsFileProcessor;
+    this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
+    this.sequence = sequence;
+    logger.info("reopen a tsfile processor {}", tsFileResource.getFile());
+  }
+
   /**
    * insert data in an InsertPlan into the workingMemtable.
    *
@@ -284,6 +297,7 @@ public class TsFileProcessor {
         return;
       }
       shouldClose = true;
+      tsFileResource.setCloseFlag();
       // when a flush thread serves this TsFileProcessor (because the 
processor is submitted by
       // registerTsFileProcessor()), the thread will seal the corresponding 
TsFile and
       // execute other cleanup works if (shouldClose == true and 
flushingMemTables is empty).
@@ -492,6 +506,7 @@ public class TsFileProcessor {
 
     tsFileResource.serialize();
     writer.endFile(schema);
+    tsFileResource.cleanCloseFlag();
 
     // remove this processor from Closing list in StorageGroupProcessor,
     // mark the TsFileResource closed, no need writer anymore
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 32817cf..656851e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -39,14 +39,19 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TsFileResource {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileResource.class);
+
   // tsfile
   private File file;
 
   public static final String RESOURCE_SUFFIX = ".resource";
   static final String TEMP_SUFFIX = ".temp";
+  private static final String CLOSING_SUFFIX = ".closing";
 
   /**
    * device -> start time
@@ -327,4 +332,28 @@ public class TsFileResource {
     }
     return false;
   }
+
+  /**
+   * set a file flag indicating that the file is being closed, so during 
recovery we could know
+   * we should close the file.
+   */
+  public void setCloseFlag() {
+    try {
+      new File(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile();
+    } catch (IOException e) {
+      logger.error("Cannot create close flag for {}", file, e);
+    }
+  }
+
+  /**
+   * clean the close flag when the file is successfully closed.
+   */
+  public void cleanCloseFlag() {
+    new File(file.getAbsoluteFile() + CLOSING_SUFFIX).delete();
+  }
+
+  public boolean isCloseFlagSet() {
+    return new File(file.getAbsoluteFile() + CLOSING_SUFFIX).exists();
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 148cf73..2faac3b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -61,23 +61,27 @@ public class TsFileRecoverPerformer {
   private LogReplayer logReplayer;
   private TsFileResource tsFileResource;
   private boolean acceptUnseq;
+  private boolean isLastFile;
 
   public TsFileRecoverPerformer(String logNodePrefix,
       Schema schema, VersionController versionController,
-      TsFileResource currentTsFileResource, boolean acceptUnseq) {
+      TsFileResource currentTsFileResource, boolean acceptUnseq, boolean 
isLastFile) {
     this.insertFilePath = currentTsFileResource.getFile().getPath();
     this.logNodePrefix = logNodePrefix;
     this.schema = schema;
     this.versionController = versionController;
     this.tsFileResource = currentTsFileResource;
     this.acceptUnseq = acceptUnseq;
+    this.isLastFile = isLastFile;
   }
 
   /**
    * 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file 
to remaining corrected
    * data 2. redo the WALs to recover unpersisted data 3. flush and close the 
file 4. clean WALs
+   * @return a RestorableTsFileIOWriter if the file is not closed before 
crush, so this writer
+   * can be used to continue writing
    */
-  public void recover() throws StorageGroupProcessorException {
+  public RestorableTsFileIOWriter recover() throws 
StorageGroupProcessorException {
 
     IMemTable recoverMemTable = new PrimitiveMemTable();
     this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, 
tsFileResource.getModFile(),
@@ -86,7 +90,7 @@ public class TsFileRecoverPerformer {
     File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
     if (!insertFile.exists()) {
       logger.error("TsFile {} is missing, will skip its recovery.", 
insertFilePath);
-      return;
+      return null;
     }
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
@@ -126,7 +130,7 @@ public class TsFileRecoverPerformer {
           // write .resource file
           tsFileResource.serialize();
         }
-        return;
+        return null;
       } catch (IOException e) {
         throw new StorageGroupProcessorException(
             "recover the resource file failed: " + insertFilePath
@@ -148,6 +152,8 @@ public class TsFileRecoverPerformer {
     } catch (IOException e) {
       throw new StorageGroupProcessorException(e);
     }
+
+    return restorableTsFileIOWriter;
   }
 
   private void recoverResourceFromFile() throws IOException {
@@ -210,8 +216,15 @@ public class TsFileRecoverPerformer {
             restorableTsFileIOWriter, 
tsFileResource.getFile().getParentFile().getName());
         tableFlushTask.syncFlushMemTable();
       }
-      // close file
-      restorableTsFileIOWriter.endFile(schema);
+
+      if (!isLastFile || isLastFile && tsFileResource.isCloseFlagSet()) {
+        // end the file if it is not the last file or it is closed before crush
+        restorableTsFileIOWriter.endFile(schema);
+        tsFileResource.cleanCloseFlag();
+      }
+      // otherwise this file is not closed before crush, do nothing so we can 
continue writing
+      // into it
+
       tsFileResource.serialize();
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index cbfd1fa..70645b5 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.writelog.recover;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +52,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -136,11 +139,62 @@ public class SeqTsFileRecoverTest {
   }
 
   @Test
-  public void test() throws StorageGroupProcessorException, IOException {
+  public void testNonLastRecovery() throws StorageGroupProcessorException, 
IOException {
     TsFileRecoverPerformer performer = new 
TsFileRecoverPerformer(logNodePrefix, schema,
-        versionController, resource, true);
+        versionController, resource, true, false);
     ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
-    performer.recover();
+    RestorableTsFileIOWriter writer = performer.recover();
+    assertFalse(writer.canWrite());
+
+    assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
+    assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+      assertEquals(19, (long) resource.getEndTimeMap().get("device" + i));
+    }
+
+    ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new 
TsFileSequenceReader(tsF.getPath()));
+    List<Path> pathList = new ArrayList<>();
+    for (int j = 0; j < 10; j++) {
+      for (int k = 0; k < 10; k++) {
+        pathList.add(new Path("device" + j, "sensor" + k));
+      }
+    }
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = readOnlyTsFile.query(queryExpression);
+    for (int i = 0; i < 20; i++) {
+      RowRecord record = dataSet.next();
+      assertEquals(i, record.getTimestamp());
+      List<Field> fields = record.getFields();
+      assertEquals(100, fields.size());
+      for (int j = 0; j < 100; j++) {
+        assertEquals(j % 10, fields.get(j).getLongV());
+      }
+    }
+
+    pathList = new ArrayList<>();
+    pathList.add(new Path("device99", "sensor1"));
+    pathList.add(new Path("device99", "sensor4"));
+    queryExpression = QueryExpression.create(pathList, null);
+    dataSet = readOnlyTsFile.query(queryExpression);
+    Assert.assertTrue(dataSet.hasNext());
+    RowRecord record = dataSet.next();
+    Assert.assertEquals("2\t0\tnull", record.toString());
+    Assert.assertTrue(dataSet.hasNext());
+    record = dataSet.next();
+    Assert.assertEquals("100\tnull\t0", record.toString());
+
+    readOnlyTsFile.close();
+  }
+
+  @Test
+  public void testLastRecovery() throws StorageGroupProcessorException, 
IOException {
+    TsFileRecoverPerformer performer = new 
TsFileRecoverPerformer(logNodePrefix, schema,
+        versionController, resource, true, true);
+    ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
+    RestorableTsFileIOWriter writer = performer.recover();
+    assertTrue(writer.canWrite());
+    writer.endFile(schema);
 
     assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
     assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 351228c..e68f4c4 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -146,7 +146,7 @@ public class UnseqTsFileRecoverTest {
   @Test
   public void test() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new 
TsFileRecoverPerformer(logNodePrefix, schema,
-        versionController, resource, true);
+        versionController, resource, true, false);
     ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
     performer.recover();
 

Reply via email to