This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0840b23644c Support upgrade to v1.4 with v1.3 wal
0840b23644c is described below
commit 0840b23644c785f4663b532edfba0f8febc5fdae
Author: Haonan <[email protected]>
AuthorDate: Thu Aug 15 08:44:57 2024 +0800
Support upgrade to v1.4 with v1.3 wal
---
.../dataregion/memtable/AbstractMemTable.java | 47 ++++++++++++
.../dataregion/wal/buffer/WALEntry.java | 3 +
.../dataregion/wal/buffer/WALEntryType.java | 7 +-
.../file/AbstractTsFileRecoverPerformer.java | 5 ++
.../file/UnsealedTsFileRecoverPerformer.java | 1 +
.../wal/recover/WALRecoverManagerTest.java | 82 +++++++++++++++++++++
.../src/test/resources/oldwal/1723544967972-1-0-0 | Bin 0 -> 237 bytes
.../datanode/src/test/resources/oldwal/_0-0-0.wal | Bin 0 -> 265150 bytes
.../src/test/resources/oldwal/_0.checkpoint | Bin 0 -> 533 bytes
.../datanode/src/test/resources/oldwal/_1-0-0.wal | Bin 0 -> 259747 bytes
.../datanode/src/test/resources/oldwal/_2-0-0.wal | Bin 0 -> 36948 bytes
.../datanode/src/test/resources/oldwal/_3-0-1.wal | Bin 0 -> 513 bytes
12 files changed, 143 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 320b3a12efd..b1098f9858e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
@@ -1096,6 +1097,37 @@ public abstract class AbstractMemTable implements
IMemTable {
}
}
+ public void deserializeFromOldMemTableSnapshot(DataInputStream stream)
throws IOException {
+ seriesNumber = stream.readInt();
+ memSize = stream.readLong();
+ tvListRamCost = stream.readLong();
+ totalPointsNum = stream.readLong();
+ totalPointsNumThreshold = stream.readLong();
+ maxPlanIndex = stream.readLong();
+ minPlanIndex = stream.readLong();
+
+ int memTableMapSize = stream.readInt();
+ for (int i = 0; i < memTableMapSize; ++i) {
+ PartialPath devicePath;
+ try {
+ devicePath =
+ DataNodeDevicePathCache.getInstance()
+ .getPartialPath(ReadWriteIOUtils.readString(stream));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize
OldMemTableSnapshot", e);
+ }
+ IDeviceID deviceID = deviceIDFactory.getDeviceID(devicePath);
+ boolean isAligned = ReadWriteIOUtils.readBool(stream);
+ IWritableMemChunkGroup memChunkGroup;
+ if (isAligned) {
+ memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
+ } else {
+ memChunkGroup = WritableMemChunkGroup.deserialize(stream);
+ }
+ memTableMap.put(deviceID, memChunkGroup);
+ }
+ }
+
@Override
public Map<IDeviceID, Long> getMaxTime() {
Map<IDeviceID, Long> latestTimeForEachDevice = new HashMap<>();
@@ -1129,6 +1161,21 @@ public abstract class AbstractMemTable implements
IMemTable {
}
return memTable;
}
+
+ public static IMemTable createFromOldMemTableSnapshot(DataInputStream
stream)
+ throws IOException {
+ boolean isSignal = ReadWriteIOUtils.readBool(stream);
+ IMemTable memTable;
+ if (isSignal) {
+ memTable = new NotifyFlushMemTable();
+ } else {
+ // database will be updated when deserialize
+ PrimitiveMemTable primitiveMemTable = new PrimitiveMemTable();
+ primitiveMemTable.deserializeFromOldMemTableSnapshot(stream);
+ memTable = primitiveMemTable;
+ }
+ return memTable;
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
index f9ea8fdb333..67030587922 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
@@ -110,6 +110,9 @@ public abstract class WALEntry implements SerializedSize {
case MEMORY_TABLE_SNAPSHOT:
value = AbstractMemTable.Factory.create(stream);
break;
+ case OLD_MEMORY_TABLE_SNAPSHOT:
+ value = AbstractMemTable.Factory.createFromOldMemTableSnapshot(stream);
+ break;
case INSERT_ROW_NODE:
value = (InsertRowNode) PlanNodeType.deserializeFromWAL(stream);
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index 914a2e90663..415bdaeb877 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -28,8 +28,9 @@ public enum WALEntryType {
INSERT_TABLET_PLAN((byte) 1),
@Deprecated
DELETE_PLAN((byte) 2),
- /** {@link org.apache.iotdb.db.engine.memtable.AbstractMemTable} */
- MEMORY_TABLE_SNAPSHOT((byte) 3),
+ @Deprecated
+ // memory tablet snapshot from 1.3 or lower version
+ OLD_MEMORY_TABLE_SNAPSHOT((byte) 3),
/** {@link
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode} */
INSERT_ROW_NODE((byte) 4),
/** {@link
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode}
*/
@@ -41,6 +42,8 @@ public enum WALEntryType {
/** {@link
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */
INSERT_ROWS_NODE((byte) 8),
CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE((byte) 9),
+ /** {@link
org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable} */
+ MEMORY_TABLE_SNAPSHOT((byte) 10),
// endregion
// region signal entry type
// signal wal buffer has been closed
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
index 0880697b02c..cec93e0ead4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
/** This class is used to help recover TsFile. */
public abstract class AbstractTsFileRecoverPerformer implements Closeable {
@@ -85,6 +86,10 @@ public abstract class AbstractTsFileRecoverPerformer
implements Closeable {
boolean result = tsFile.delete();
logger.warn(
"TsFile {} is incompatible. Try to delete it and delete result is
{}", tsFile, result);
+ // if the broken TsFile is v3, we can recover the all data from wal
+ // to support it, we can regenerate an empty file here
+ Files.createFile(tsFile.toPath());
+ writer = new RestorableTsFileIOWriter(tsFile);
throw new DataRegionException(e);
} catch (IOException e) {
throw new DataRegionException(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index 71ff9417e91..a09c473e16e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -190,6 +190,7 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
try {
switch (walEntry.getType()) {
case MEMORY_TABLE_SNAPSHOT:
+ case OLD_MEMORY_TABLE_SNAPSHOT:
IMemTable memTable = (IMemTable) walEntry.getValue();
if (!memTable.isSignalMemTable()) {
if (tsFileResource != null) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
index dace1a99ba6..dc47ec84219 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -73,6 +74,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -351,6 +353,86 @@ public class WALRecoverManagerTest {
// endregion
}
+ @Test
+ public void testRecoverOldWalWithEmptyTsFile() throws Exception {
+ // old version of wal is generated by
prepareCheckpointAndWALFileForSnapshot() in v1.3
+ String oldWalPathStr =
this.getClass().getClassLoader().getResource("oldwal").getFile();
+ File oldWalFileDir = new File(oldWalPathStr);
+ FileUtils.copyDir(oldWalFileDir, new File(WAL_NODE_FOLDER));
+ WALRecoverManager.getInstance().clear();
+ recoverFromOldWalAndCheck(false);
+ }
+
+ @Test
+ public void testRecoverOldWalWithBrokenTsFile() throws Exception {
+ // old version of wal is generated by
prepareCheckpointAndWALFileForSnapshot() in v1.3
+ String oldWalPathStr =
this.getClass().getClassLoader().getResource("oldwal").getFile();
+ File oldWalFileDir = new File(oldWalPathStr);
+ FileUtils.copyDir(oldWalFileDir, new File(WAL_NODE_FOLDER));
+ WALRecoverManager.getInstance().clear();
+ recoverFromOldWalAndCheck(true);
+ }
+
+ private void recoverFromOldWalAndCheck(boolean withBrokenTsFile) throws
Exception {
+ // prepare tsFiles
+ List<WALRecoverListener> recoverListeners = new ArrayList<>();
+
+ // prepare file with wal
+ File fileWithWALDir = new File(FILE_WITH_WAL_NAME).getParentFile();
+ Files.createDirectories(fileWithWALDir.toPath());
+ File fileWithWAL = new File(fileWithWALDir, "1723544967972-1-0-0.tsfile");
+ if (withBrokenTsFile) {
+ // copy a broken TsFileV3
+ String oldWalPathStr =
this.getClass().getClassLoader().getResource("oldwal").getFile();
+ File oldWalFileDir = new File(oldWalPathStr);
+ FileUtils.copyFile(new File(oldWalFileDir, "1723544967972-1-0-0"),
fileWithWAL);
+ } else {
+ // create an empty tsfile
+ Files.createFile(fileWithWAL.toPath());
+ }
+ tsFileWithWALResource = new TsFileResource(fileWithWAL);
+ UnsealedTsFileRecoverPerformer recoverPerformer =
+ new UnsealedTsFileRecoverPerformer(
+ tsFileWithWALResource, true, performer ->
assertFalse(performer.canWrite()));
+ recoverManager.addRecoverPerformer(recoverPerformer);
+ recoverListeners.add(recoverPerformer.getRecoverListener());
+ // recover
+ recoverManager.setAllDataRegionScannedLatch(new
ExceptionalCountDownLatch(0));
+ recoverManager.recover();
+ // check recover listeners
+ try {
+ for (WALRecoverListener recoverListener : recoverListeners) {
+ assertEquals(WALRecoverListener.Status.SUCCESS,
recoverListener.waitForResult());
+ }
+ } catch (NullPointerException e) {
+ // ignore
+ }
+ // region check file with wal
+ // check file content
+ TsFileSequenceReader reader = new
TsFileSequenceReader(fileWithWAL.getPath());
+ List<ChunkMetadata> chunkMetadataList =
+ reader.getChunkMetadataList(new Path(DEVICE1_NAME, "s1", true));
+ assertNotNull(chunkMetadataList);
+ chunkMetadataList = reader.getChunkMetadataList(new Path(DEVICE1_NAME,
"s2", true));
+ assertNotNull(chunkMetadataList);
+ chunkMetadataList = reader.getChunkMetadataList(new Path(DEVICE2_NAME,
"s1", true));
+ assertNotNull(chunkMetadataList);
+ chunkMetadataList = reader.getChunkMetadataList(new Path(DEVICE2_NAME,
"s2", true));
+ assertNotNull(chunkMetadataList);
+ assertEquals(1, chunkMetadataList.size());
+ Chunk chunk = reader.readMemChunk(chunkMetadataList.get(0));
+ assertEquals(15, chunk.getChunkStatistic().getEndTime());
+ reader.close();
+ // check .resource file in memory
+ assertEquals(4, tsFileWithWALResource.getStartTime(DEVICE2_NAME));
+ assertEquals(15, tsFileWithWALResource.getEndTime(DEVICE2_NAME));
+ // check file existence
+ assertTrue(fileWithWAL.exists());
+ assertTrue(new
File(fileWithWAL.getPath().concat(TsFileResource.RESOURCE_SUFFIX)).exists());
+ // endregion
+
+ }
+
private InsertRowNode getInsertRowNode(String devicePath, long time)
throws MetadataException, QueryProcessException {
TSDataType[] dataTypes = new TSDataType[] {TSDataType.FLOAT,
TSDataType.DOUBLE};
diff --git a/iotdb-core/datanode/src/test/resources/oldwal/1723544967972-1-0-0
b/iotdb-core/datanode/src/test/resources/oldwal/1723544967972-1-0-0
new file mode 100644
index 00000000000..f41e64de04b
Binary files /dev/null and
b/iotdb-core/datanode/src/test/resources/oldwal/1723544967972-1-0-0 differ
diff --git a/iotdb-core/datanode/src/test/resources/oldwal/_0-0-0.wal
b/iotdb-core/datanode/src/test/resources/oldwal/_0-0-0.wal
new file mode 100644
index 00000000000..49e504082d7
Binary files /dev/null and
b/iotdb-core/datanode/src/test/resources/oldwal/_0-0-0.wal differ
diff --git a/iotdb-core/datanode/src/test/resources/oldwal/_0.checkpoint
b/iotdb-core/datanode/src/test/resources/oldwal/_0.checkpoint
new file mode 100644
index 00000000000..6d1291c2867
Binary files /dev/null and
b/iotdb-core/datanode/src/test/resources/oldwal/_0.checkpoint differ
diff --git a/iotdb-core/datanode/src/test/resources/oldwal/_1-0-0.wal
b/iotdb-core/datanode/src/test/resources/oldwal/_1-0-0.wal
new file mode 100644
index 00000000000..a092cf71d5e
Binary files /dev/null and
b/iotdb-core/datanode/src/test/resources/oldwal/_1-0-0.wal differ
diff --git a/iotdb-core/datanode/src/test/resources/oldwal/_2-0-0.wal
b/iotdb-core/datanode/src/test/resources/oldwal/_2-0-0.wal
new file mode 100644
index 00000000000..71c6581cad6
Binary files /dev/null and
b/iotdb-core/datanode/src/test/resources/oldwal/_2-0-0.wal differ
diff --git a/iotdb-core/datanode/src/test/resources/oldwal/_3-0-1.wal
b/iotdb-core/datanode/src/test/resources/oldwal/_3-0-1.wal
new file mode 100644
index 00000000000..00041c3d9fc
Binary files /dev/null and
b/iotdb-core/datanode/src/test/resources/oldwal/_3-0-1.wal differ