This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new abf4f1b [To rel/0.12][IOTDB-1731]Fix sync error between different os
(#4025)
abf4f1b is described below
commit abf4f1badd9b66217aeccaad93e596a6df5701fd
Author: yschengzi <[email protected]>
AuthorDate: Mon Oct 18 09:09:45 2021 +0800
[To rel/0.12][IOTDB-1731]Fix sync error between different os (#4025)
---
.../db/sync/receiver/transfer/SyncServiceImpl.java | 53 ++++++++++++++++------
.../iotdb/db/sync/sender/transfer/SyncClient.java | 19 ++++----
2 files changed, 46 insertions(+), 26 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 832d714..7ad15ba 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -78,7 +78,7 @@ public class SyncServiceImpl implements SyncService.Iface {
public SyncStatus check(ConfirmInfo info) {
String ipAddress = info.address, uuid = info.uuid;
Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
- if (!info.version.equals(IoTDBConstant.MAJOR_VERSION)) {
+ if (!getMajorVersion(info.version).equals(IoTDBConstant.MAJOR_VERSION)) {
return getErrorResult(
String.format(
"Version mismatch: the sender <%s>, the receiver <%s>",
@@ -106,6 +106,12 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
+ private String getMajorVersion(String version) {
+ return version.equals("UNKNOWN")
+ ? "UNKNOWN"
+ : version.split("\\.")[0] + "." + version.split("\\.")[1];
+ }
+
private boolean checkRecovery() {
try {
if (currentFileWriter.get() != null) {
@@ -164,34 +170,44 @@ public class SyncServiceImpl implements SyncService.Iface
{
}
@Override
- public SyncStatus syncDeletedFileName(String fileName) {
+ public SyncStatus syncDeletedFileName(String fileInfo) {
+ String filePath = currentSG.get() + File.separator +
getFilePathByFileInfo(fileInfo);
try {
- syncLog
- .get()
- .finishSyncDeletedFileName(
- new File(getSyncDataPath(), currentSG.get() + File.separatorChar
+ fileName));
+ syncLog.get().finishSyncDeletedFileName(new File(getSyncDataPath(),
filePath));
FileLoaderManager.getInstance()
.getFileLoader(senderName.get())
- .addDeletedFileName(
- new File(getSyncDataPath(), currentSG.get() + File.separatorChar
+ fileName));
+ .addDeletedFileName(new File(getSyncDataPath(), filePath));
} catch (IOException e) {
logger.error("Can not sync deleted file", e);
return getErrorResult(
- String.format("Can not sync deleted file %s because %s", fileName,
e.getMessage()));
+ String.format("Can not sync deleted file %s because %s", filePath,
e.getMessage()));
}
return getSuccessResult();
}
+ private String getFilePathByFileInfo(String fileInfo) { // for different os
+ String filePath = "";
+ String[] fileInfos = fileInfo.split(SyncConstant.SYNC_DIR_NAME_SEPARATOR);
+ for (int i = 0; i < fileInfos.length - 1; i++) {
+ filePath += fileInfos[i] + File.separator;
+ }
+ return filePath + fileInfos[fileInfos.length - 1];
+ }
+
@SuppressWarnings("squid:S2095") // Suppress unclosed resource warning
@Override
- public SyncStatus initSyncData(String filename) {
+ public SyncStatus initSyncData(String fileInfo) {
+ File file;
+ String filePath = fileInfo;
+ logger.info("initSyncData receive: " + fileInfo); // change back
try {
- File file;
if (currentSG.get() == null) { // schema mlog.txt file
- file = new File(getSyncDataPath(), filename);
+ file = new File(getSyncDataPath(), filePath);
} else {
- file = new File(getSyncDataPath(), currentSG.get() +
File.separatorChar + filename);
+ filePath = currentSG.get() + File.separator +
getFilePathByFileInfo(fileInfo);
+ file = new File(getSyncDataPath(), filePath);
}
+ logger.info("initSyncData file: " + file); // change back
file.delete();
currentFile.set(file);
if (!file.getParentFile().exists()) {
@@ -203,11 +219,13 @@ public class SyncServiceImpl implements SyncService.Iface
{
currentFileWriter.set(new FileOutputStream(file));
syncLog.get().startSyncTsFiles();
messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME));
+ logger.info(
+ "initSyncData currentFileWriter: " +
currentFileWriter.get().toString()); // change back
} catch (IOException | NoSuchAlgorithmException e) {
- logger.error("Can not init sync resource for file {}", filename, e);
+ logger.error("Can not init sync resource for file {}", filePath, e);
return getErrorResult(
String.format(
- "Can not init sync resource for file %s because %s", filename,
e.getMessage()));
+ "Can not init sync resource for file %s because %s", filePath,
e.getMessage()));
}
return getSuccessResult();
}
@@ -216,6 +234,11 @@ public class SyncServiceImpl implements SyncService.Iface {
public SyncStatus syncData(ByteBuffer buff) {
try {
int pos = buff.position();
+ logger.info("syncData currentSG: " + currentSG.get()); // change back
+ logger.info("syncData currentFileWriter: " +
currentFileWriter.get().toString());
+ logger.info(
+ "syncData currentFileWriterChannel is Open: "
+ + currentFileWriter.get().getChannel().isOpen());
currentFileWriter.get().getChannel().write(buff);
buff.position(pos);
messageDigest.get().update(buff);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 6ff1143..06ed0b6 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -586,15 +586,14 @@ public class SyncClient implements ISyncClient {
if (!isSyncConnect && !reconnect()) {
continue;
}
- if (serviceClient.syncDeletedFileName(getFileNameWithSG(file)).code ==
SUCCESS_CODE) {
- logger.info(
- "Receiver has received deleted file name {} successfully.",
getFileNameWithSG(file));
+ if
(serviceClient.syncDeletedFileName(getFileInfoWithVgAndTimePartition(file)).code
+ == SUCCESS_CODE) {
+ logger.info("Receiver has received deleted file name {}
successfully.", file.getPath());
lastLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).remove(file);
syncLog.finishSyncDeletedFileName(file);
}
} catch (TException e) {
logger.error("Can not sync deleted file name {}, skip it.", file);
- isSyncConnect = false;
}
}
logger.info("Finish to sync names of deleted files in storage group {}",
sgName);
@@ -660,7 +659,7 @@ public class SyncClient implements ISyncClient {
try {
int retryCount = 0;
MessageDigest md =
MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME);
- serviceClient.initSyncData(getFileNameWithSG(snapshotFile));
+
serviceClient.initSyncData(getFileInfoWithVgAndTimePartition(snapshotFile));
outer:
while (true) {
retryCount++;
@@ -782,13 +781,11 @@ public class SyncClient implements ISyncClient {
SyncClient.config = config;
}
- private String getFileNameWithSG(File file) {
- return file.getParentFile().getParentFile().getParentFile().getName()
- + File.separator
- + file.getParentFile().getParentFile().getName()
- + File.separator
+ private String getFileInfoWithVgAndTimePartition(File file) {
+ return file.getParentFile().getParentFile().getName()
+ + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+ file.getParentFile().getName()
- + File.separator
+ + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+ file.getName();
}
}