This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 3f205e1 [To rel/0.12][IOTDB-1877] Fix Sync recovery and reconnection
bugs in both sender and receiver (#4217)
3f205e1 is described below
commit 3f205e18f21b3dfb673df4f5bae50eab4e919d2c
Author: yschengzi <[email protected]>
AuthorDate: Wed Oct 27 17:28:26 2021 +0800
[To rel/0.12][IOTDB-1877] Fix Sync recovery and reconnection bugs in both
sender and receiver (#4217)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +++++++++++++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 23 ++++------------------
.../iotdb/db/sync/sender/transfer/SyncClient.java | 13 +++++++-----
3 files changed, 26 insertions(+), 24 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3688151..f5845c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1170,6 +1170,20 @@ public class IoTDBConfig {
this.languageVersion = languageVersion;
}
+ public String getIoTDBVersion() {
+ return IoTDBConstant.VERSION;
+ }
+
+ public String getIoTDBMajorVersion() {
+ return IoTDBConstant.MAJOR_VERSION;
+ }
+
+ public String getIoTDBMajorVersion(String version) {
+ return version.equals("UNKNOWN")
+ ? "UNKNOWN"
+ : version.split("\\.")[0] + "." + version.split("\\.")[1];
+ }
+
public String getIpWhiteList() {
return ipWhiteList;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7ad15ba..ec38878 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.transfer;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -78,11 +77,11 @@ public class SyncServiceImpl implements SyncService.Iface {
public SyncStatus check(ConfirmInfo info) {
String ipAddress = info.address, uuid = info.uuid;
Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
- if (!getMajorVersion(info.version).equals(IoTDBConstant.MAJOR_VERSION)) {
+ if
(!config.getIoTDBMajorVersion(info.version).equals(config.getIoTDBMajorVersion()))
{
return getErrorResult(
String.format(
"Version mismatch: the sender <%s>, the receiver <%s>",
- info.version, IoTDBConstant.MAJOR_VERSION));
+ info.version, config.getIoTDBVersion()));
}
if (info.partitionInterval
!= IoTDBDescriptor.getInstance().getConfig().getPartitionInterval()) {
@@ -106,12 +105,6 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
- private String getMajorVersion(String version) {
- return version.equals("UNKNOWN")
- ? "UNKNOWN"
- : version.split("\\.")[0] + "." + version.split("\\.")[1];
- }
-
private boolean checkRecovery() {
try {
if (currentFileWriter.get() != null) {
@@ -173,6 +166,7 @@ public class SyncServiceImpl implements SyncService.Iface {
public SyncStatus syncDeletedFileName(String fileInfo) {
String filePath = currentSG.get() + File.separator +
getFilePathByFileInfo(fileInfo);
try {
+ syncLog.get().startSyncDeletedFilesName();
syncLog.get().finishSyncDeletedFileName(new File(getSyncDataPath(),
filePath));
FileLoaderManager.getInstance()
.getFileLoader(senderName.get())
@@ -199,15 +193,13 @@ public class SyncServiceImpl implements SyncService.Iface
{
public SyncStatus initSyncData(String fileInfo) {
File file;
String filePath = fileInfo;
- logger.info("initSyncData receive: " + fileInfo); // change back
try {
- if (currentSG.get() == null) { // schema mlog.txt file
+ if (fileInfo.equals(MetadataConstant.METADATA_LOG)) { // schema mlog.txt
file
file = new File(getSyncDataPath(), filePath);
} else {
filePath = currentSG.get() + File.separator +
getFilePathByFileInfo(fileInfo);
file = new File(getSyncDataPath(), filePath);
}
- logger.info("initSyncData file: " + file); // change back
file.delete();
currentFile.set(file);
if (!file.getParentFile().exists()) {
@@ -219,8 +211,6 @@ public class SyncServiceImpl implements SyncService.Iface {
currentFileWriter.set(new FileOutputStream(file));
syncLog.get().startSyncTsFiles();
messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME));
- logger.info(
- "initSyncData currentFileWriter: " +
currentFileWriter.get().toString()); // change back
} catch (IOException | NoSuchAlgorithmException e) {
logger.error("Can not init sync resource for file {}", filePath, e);
return getErrorResult(
@@ -234,11 +224,6 @@ public class SyncServiceImpl implements SyncService.Iface {
public SyncStatus syncData(ByteBuffer buff) {
try {
int pos = buff.position();
- logger.info("syncData currentSG: " + currentSG.get()); // change back
- logger.info("syncData currentFileWriter: " +
currentFileWriter.get().toString());
- logger.info(
- "syncData currentFileWriterChannel is Open: "
- + currentFileWriter.get().getChannel().isOpen());
currentFileWriter.get().getChannel().write(buff);
buff.position(pos);
messageDigest.get().update(buff);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 06ed0b6..d197f9f 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.sync.sender.transfer;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.SyncConnectionException;
@@ -265,12 +264,12 @@ public class SyncClient implements ISyncClient {
dataDirs.length);
config.update(dataDir);
+ checkRecovery();
syncFileManager.getValidFiles(dataDir);
allSG = syncFileManager.getAllSGs();
lastLocalFilesMap = syncFileManager.getLastLocalFilesMap();
deletedFilesMap = syncFileManager.getDeletedFilesMap();
toBeSyncedFilesMap = syncFileManager.getToBeSyncedFilesMap();
- checkRecovery();
if (SyncUtils.isEmpty(deletedFilesMap) &&
SyncUtils.isEmpty(toBeSyncedFilesMap)) {
logger.info("There has no data to sync in data dir {}", dataDir);
continue;
@@ -333,7 +332,9 @@ public class SyncClient implements ISyncClient {
try {
establishConnection(config.getServerIp(), config.getServerPort());
- } catch (SyncConnectionException e) {
+ confirmIdentity();
+ serviceClient.startSync();
+ } catch (SyncConnectionException | TException e) {
logger.warn("Can not reconnect to receiver {}. Caused by ",
config.getSyncReceiverName(), e);
return false;
}
@@ -348,7 +349,7 @@ public class SyncClient implements ISyncClient {
socket.getLocalAddress().getHostAddress(),
getOrCreateUUID(getUuidFile()),
ioTDBConfig.getPartitionInterval(),
- IoTDBConstant.MAJOR_VERSION);
+ ioTDBConfig.getIoTDBMajorVersion());
SyncStatus status = serviceClient.check(info);
if (status.code != SUCCESS_CODE) {
throw new SyncConnectionException(
@@ -396,7 +397,6 @@ public class SyncClient implements ISyncClient {
return;
}
int retryCount = 0;
- serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
while (true) {
if (retryCount > config.getMaxNumOfSyncFileRetry()) {
throw new SyncConnectionException(
@@ -421,6 +421,8 @@ public class SyncClient implements ISyncClient {
// start to sync file data and get digest of this file.
try (FileInputStream fis = new FileInputStream(getSchemaLogFile());
ByteArrayOutputStream bos = new
ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
+ serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
+
long skipNum = fis.skip(schemaFilePos);
if (skipNum != schemaFilePos) {
logger.warn(
@@ -594,6 +596,7 @@ public class SyncClient implements ISyncClient {
}
} catch (TException e) {
logger.error("Can not sync deleted file name {}, skip it.", file);
+ isSyncConnect = false;
}
}
logger.info("Finish to sync names of deleted files in storage group {}",
sgName);