This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 13f09c2a594 tmp save for DataNode recover
13f09c2a594 is described below
commit 13f09c2a594c9f6b8698948aae9afc10654d23c9
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon May 22 23:07:00 2023 +0800
tmp save for DataNode recover
---
.../iotdb/os/HybridFileInputFactoryDecorator.java | 57 ++++++++++++++++++++++
.../apache/iotdb/os/conf/ObjectStorageConfig.java | 4 ++
server/pom.xml | 5 ++
.../iotdb/db/engine/storagegroup/DataRegion.java | 24 +++++++--
.../java/org/apache/iotdb/db/service/DataNode.java | 8 +++
.../file/AbstractTsFileRecoverPerformer.java | 9 ++--
.../iotdb/tsfile/fileSystem/FSFactoryProducer.java | 4 ++
7 files changed, 104 insertions(+), 7 deletions(-)
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/HybridFileInputFactoryDecorator.java
b/object-storage/src/main/java/org/apache/iotdb/os/HybridFileInputFactoryDecorator.java
new file mode 100644
index 00000000000..2f7e425ca60
--- /dev/null
+++
b/object-storage/src/main/java/org/apache/iotdb/os/HybridFileInputFactoryDecorator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.os;
+
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.io.aws.AWSS3Config;
+import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.FileInputFactory;
+import
org.apache.iotdb.tsfile.fileSystem.fileInputFactory.HybridFileInputFactory;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class HybridFileInputFactoryDecorator implements FileInputFactory {
+ private static final Logger logger =
+ LoggerFactory.getLogger(HybridFileInputFactoryDecorator.class);
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final FileInputFactory fileInputFactory = new
HybridFileInputFactory();
+
+ private int dataNodeId;
+
+ public HybridFileInputFactoryDecorator(int dataNodeId) {
+ this.dataNodeId = dataNodeId;
+ }
+
+ @Override
+ public TsFileInput getTsFileInput(String filePath) throws IOException {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ fileInputFactory.getTsFileInput(
+ FSUtils.parseLocalTsFile2OSFile(file, AWSS3Config.getBucketName(),
dataNodeId).getPath());
+ }
+ return fileInputFactory.getTsFileInput(filePath);
+ }
+}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
index 1375ada8b53..23724ca677f 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
@@ -50,6 +50,10 @@ public class ObjectStorageConfig {
return cacheDirs;
}
+ public String getBucketName() {
+ return AWSS3Config.getBucketName();
+ }
+
public void setCacheDirs(String[] cacheDirs) {
this.cacheDirs = cacheDirs;
}
diff --git a/server/pom.xml b/server/pom.xml
index fbda65880f3..4538676180e 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -38,6 +38,11 @@
<tomcat-embed-core.version>10.1.0-M1</tomcat-embed-core.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>os-tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>service-rpc</artifactId>
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 850fda3ba78..cb488eaf301 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -140,6 +140,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static
org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
import static
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static
org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet.SEQUENCE_TSFILE;
import static
org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet.UNSEQUENCE_TSFILE;
@@ -680,6 +681,8 @@ public class DataRegion implements IDataRegionForQuery {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
private Pair<List<TsFileResource>, List<TsFileResource>>
getAllFiles(List<String> folders)
throws IOException, DataRegionException {
+ // represents local TsFile and remote TsFile on Object Storage
+ Set<String> tsFilePathSet = new HashSet<>();
List<File> tsFiles = new ArrayList<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
@@ -704,9 +707,24 @@ public class DataRegion implements IDataRegionForQuery {
// resources
continueFailedRenames(partitionFolder, TEMP_SUFFIX);
- Collections.addAll(
- tsFiles,
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
TSFILE_SUFFIX));
+ File[] tsFilesInThisFolder =
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
TSFILE_SUFFIX);
+ File[] resourceFilesInThisFolder =
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
RESOURCE_SUFFIX);
+ for (File f : tsFilesInThisFolder) {
+ tsFilePathSet.add(f.getCanonicalPath());
+ }
+
+ Collections.addAll(tsFiles, tsFilesInThisFolder);
+ for (File f : resourceFilesInThisFolder) {
+ String tsFilePath =
+ f.getCanonicalPath()
+ .substring(0, f.getCanonicalPath().length() -
RESOURCE_SUFFIX.length());
+ if (!tsFilePathSet.contains(tsFilePath)) {
+ tsFiles.add(fsFactory.getFile(tsFilePath));
+ }
+ }
+
} else {
// collect old TsFiles for upgrading
Collections.addAll(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1e95ffe09f9..39d53476036 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -85,8 +85,10 @@ import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.InternalReporterType;
+import org.apache.iotdb.os.HybridFileInputFactoryDecorator;
import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.udf.api.exception.UDFManagementException;
import org.apache.thrift.TException;
@@ -393,6 +395,7 @@ public class DataNode implements DataNodeMBean {
/* Store runtime configurations when register success */
int dataNodeID = dataNodeRegisterResp.getDataNodeId();
config.setDataNodeId(dataNodeID);
+ configOSStorage(dataNodeID);
IoTDBStartCheck.getInstance()
.serializeClusterNameAndDataNodeId(config.getClusterName(),
dataNodeID);
@@ -407,6 +410,11 @@ public class DataNode implements DataNodeMBean {
}
}
+ private void configOSStorage(int dataNodeID) {
+ FSFactoryProducer.setFileInputFactory(new
HybridFileInputFactoryDecorator(dataNodeID));
+ // recover OS cache
+ }
+
private void sendRestartRequestToConfigNode() throws StartupException {
logger.info("Sending restart request to ConfigNode-leader...");
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
index 1d9047d349c..aa08daf4151 100644
---
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -63,10 +63,6 @@ public abstract class AbstractTsFileRecoverPerformer
implements Closeable {
// delete chunk metadata temp file
FileUtils.delete(chunkMetadataTempFile);
}
- if (!tsFile.exists()) {
- logger.error("TsFile {} is missing, will skip its recovery.", tsFile);
- return;
- }
if (tsFileResource.resourceFileExists()) {
// .resource file exists, just deserialize it into memory
@@ -74,6 +70,11 @@ public abstract class AbstractTsFileRecoverPerformer
implements Closeable {
return;
}
+ if (!tsFile.exists()) {
+ logger.error("TsFile {} is missing, will skip its recovery.", tsFile);
+ return;
+ }
+
// try to remove corrupted part of the TsFile
try {
writer = new RestorableTsFileIOWriter(tsFile);
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
index 2a8711efe14..09413d318a4 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
@@ -39,6 +39,10 @@ public class FSFactoryProducer {
return fileInputFactory;
}
+ public static void setFileInputFactory(FileInputFactory fileInputFactory) {
+ FSFactoryProducer.fileInputFactory = fileInputFactory;
+ }
+
public static FileOutputFactory getFileOutputFactory() {
return fileOutputFactory;
}