This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch remove-sync-entry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1fbb14fb07227b55637ffb0c0c2bae6ea2d81a36 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 17 04:43:27 2023 +0800 refactor --- .../pipe/connector/legacy/IoTDBSyncReceiver.java | 112 ++++++++++++++++----- .../connector/legacy/loader/DeletionLoader.java | 3 +- .../legacy/pipedata/DeletionPipeData.java | 14 --- .../connector/legacy/pipedata/TsFilePipeData.java | 83 --------------- .../legacy/transport/SyncIdentityInfo.java | 57 ----------- .../pipe/connector/legacy/utils/SyncConstant.java | 38 ------- .../pipe/connector/legacy/utils/SyncPathUtil.java | 74 -------------- 7 files changed, 90 insertions(+), 291 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java index 5189a3635ba..c2b651c1e62 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.connector.legacy; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -30,9 +31,6 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData; import org.apache.iotdb.db.pipe.connector.legacy.pipedata.TsFilePipeData; -import org.apache.iotdb.db.pipe.connector.legacy.transport.SyncIdentityInfo; -import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant; -import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncPathUtil; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; @@ -53,11 +51,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -/** This class is responsible for implementing the RPC processing on the receiver-side. */ public class IoTDBSyncReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiver.class); + private static final String PATCH_SUFFIX = ".patch"; + // When the client abnormally exits, we can still know who to disconnect private final ThreadLocal<Long> currentConnectionId = new ThreadLocal<>(); @@ -74,7 +73,7 @@ public class IoTDBSyncReceiver { // The sync connectionId is unique in one IoTDB instance. private final AtomicLong connectionIdGenerator = new AtomicLong(); - ////////////// Interfaces and Implementation of RPC Handler //////////////////////// + //////////////////////// methods for RPC handler //////////////////////// /** * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally). @@ -102,8 +101,8 @@ public class IoTDBSyncReceiver { SyncIdentityInfo identityInfo = new SyncIdentityInfo(tIdentityInfo, remoteAddress); LOGGER.info("Invoke handshake method from client ip = {}", identityInfo.getRemoteAddress()); - if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) { - new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs(); + if (!new File(getFileDataDir(identityInfo)).exists()) { + new File(getFileDataDir(identityInfo)).mkdirs(); } createConnection(identityInfo); if (!StringUtils.isEmpty(identityInfo.getDatabase())) { @@ -173,7 +172,7 @@ public class IoTDBSyncReceiver { } LOGGER.debug( "Invoke transportPipeData method from client ip = {}", identityInfo.getRemoteAddress()); - String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo); + String fileDir = getFileDataDir(identityInfo); // step2. deserialize PipeData PipeData pipeData; @@ -237,9 +236,7 @@ public class IoTDBSyncReceiver { String tsFileName = tsFilePipeData.getTsFileName(); File dir = new File(fileDir); File[] targetFiles = - dir.listFiles( - (dir1, name) -> - name.startsWith(tsFileName) && name.endsWith(SyncConstant.PATCH_SUFFIX)); + dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && name.endsWith(PATCH_SUFFIX)); if (targetFiles != null) { for (File targetFile : targetFiles) { File newFile = @@ -247,8 +244,7 @@ public class IoTDBSyncReceiver { dir, targetFile .getName() - .substring( - 0, targetFile.getName().length() - SyncConstant.PATCH_SUFFIX.length())); + .substring(0, targetFile.getName().length() - PATCH_SUFFIX.length())); targetFile.renameTo(newFile); } } @@ -274,14 +270,14 @@ public class IoTDBSyncReceiver { LOGGER.debug( "Invoke transportData method from client ip = {}", identityInfo.getRemoteAddress()); - String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo); + String fileDir = getFileDataDir(identityInfo); String fileName = metaInfo.fileName; long startIndex = metaInfo.startIndex; - File file = new File(fileDir, fileName + SyncConstant.PATCH_SUFFIX); + File file = new File(fileDir, fileName + PATCH_SUFFIX); // step2. check startIndex try { - CheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex); + IndexCheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex); if (!result.isResult()) { return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex()); } @@ -314,7 +310,7 @@ public class IoTDBSyncReceiver { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } - private CheckResult checkStartIndexValid(File file, long startIndex) throws IOException { + private IndexCheckResult checkStartIndexValid(File file, long startIndex) throws IOException { // get local index from memory map long localIndex = getCurrentFileStartIndex(file.getAbsolutePath()); // get local index from file @@ -328,16 +324,16 @@ public class IoTDBSyncReceiver { "The start index {} of data sync is not valid. " + "The file is not exist and start index should equal to 0).", startIndex); - return new CheckResult(false, "0"); + return new IndexCheckResult(false, "0"); } else if (localIndex >= 0 && localIndex != startIndex) { LOGGER.error( "The start index {} of data sync is not valid. " + "The start index of the file should equal to {}.", startIndex, localIndex); - return new CheckResult(false, String.valueOf(localIndex)); + return new IndexCheckResult(false, String.valueOf(localIndex)); } - return new CheckResult(true, "0"); + return new IndexCheckResult(true, "0"); } /** @@ -365,12 +361,82 @@ public class IoTDBSyncReceiver { } } - private static class CheckResult { + ///////////////////////// sync data dir structure ///////////////////////// + + // data/sync + // |----receiver dir + // | |-----receiver pipe dir + // | |----file data dir + + private static final String RECEIVER_DIR_NAME = "receiver"; + private static final String FILE_DATA_DIR_NAME = "file-data"; + + private static String getFileDataDir(SyncIdentityInfo identityInfo) { + return getReceiverPipeDir( + identityInfo.getPipeName(), + identityInfo.getRemoteAddress(), + identityInfo.getCreateTime()) + + File.separator + + FILE_DATA_DIR_NAME; + } + + private static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) { + return getReceiverDir() + + File.separator + + String.format("%s-%d-%s", pipeName, createTime, remoteIp); + } + + private static String getReceiverDir() { + return CommonDescriptor.getInstance().getConfig().getSyncDir() + + File.separator + + RECEIVER_DIR_NAME; + } + + ///////////////////// helper classes ////////////////////// + + private static class SyncIdentityInfo { + + private final String pipeName; + private final long createTime; + private final String version; + private final String database; + private final String remoteAddress; + + public SyncIdentityInfo(TSyncIdentityInfo identityInfo, String remoteAddress) { + this.pipeName = identityInfo.getPipeName(); + this.createTime = identityInfo.getCreateTime(); + this.version = identityInfo.getVersion(); + this.database = identityInfo.getDatabase(); + this.remoteAddress = remoteAddress; + } + + public String getPipeName() { + return pipeName; + } + + public long getCreateTime() { + return createTime; + } + + public String getVersion() { + return version; + } + + public String getRemoteAddress() { + return remoteAddress; + } + + public String getDatabase() { + return database; + } + } + + private static class IndexCheckResult { private final boolean result; private final String index; - public CheckResult(boolean result, String index) { + public IndexCheckResult(boolean result, String index) { this.result = result; this.index = index; } @@ -384,7 +450,7 @@ public class IoTDBSyncReceiver { } } - ///////////////////////// Singleton ///////////////////////// + ///////////////////////// singleton ///////////////////////// private IoTDBSyncReceiver() {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java index 8445a6fdcd6..f6435ee1a61 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement; -import org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -49,7 +48,7 @@ public class DeletionLoader implements ILoader { } @Override - public void load() throws SyncDataLoadException { + public void load() throws PipeException { if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { throw new PipeException("storage engine readonly"); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java index 96fb8601dfd..a98d19b5356 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java @@ -25,16 +25,12 @@ import org.apache.iotdb.db.pipe.connector.legacy.loader.DeletionLoader; import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Objects; public class DeletionPipeData extends PipeData { - private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class); private String database; private Deletion deletion; @@ -43,16 +39,6 @@ public class DeletionPipeData extends PipeData { super(); } - public DeletionPipeData(Deletion deletion, long serialNumber) { - this("", deletion, serialNumber); - } - - public DeletionPipeData(String sgName, Deletion deletion, long serialNumber) { - super(serialNumber); - this.database = sgName; - this.deletion = deletion; - } - @Override public PipeDataType getPipeDataType() { return PipeDataType.DELETION; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java index 1f2d93d3879..803a5c67427 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java @@ -20,27 +20,17 @@ package org.apache.iotdb.db.pipe.connector.legacy.pipedata; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader; import org.apache.iotdb.db.pipe.connector.legacy.loader.TsFileLoader; -import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; public class TsFilePipeData extends PipeData { - private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class); private String parentDirPath; private String tsFileName; @@ -50,22 +40,6 @@ public class TsFilePipeData extends PipeData { super(); } - public TsFilePipeData(String tsFilePath, long serialNumber) { - super(serialNumber); - String sep = File.separator.equals("\\") ? "\\\\" : File.separator; - String[] paths = tsFilePath.split(sep); - tsFileName = paths[paths.length - 1]; - if (paths.length > 1) { - parentDirPath = - tsFilePath.substring( - 0, tsFilePath.length() - tsFileName.length() - File.separator.length()); - } else { - parentDirPath = ""; - } - - initDatabaseName(); - } - public TsFilePipeData(String parentDirPath, String tsFileName, long serialNumber) { super(serialNumber); this.parentDirPath = parentDirPath; @@ -104,14 +78,6 @@ public class TsFilePipeData extends PipeData { return parentDirPath + File.separator + tsFileName; } - public String getResourceFilePath() { - return getTsFilePath() + TsFileResource.RESOURCE_SUFFIX; - } - - public String getModsFilePath() { - return getTsFilePath() + ModificationFile.FILE_SUFFIX; - } - public void setDatabase(String database) { this.database = database; } @@ -148,55 +114,6 @@ public class TsFilePipeData extends PipeData { return new TsFileLoader(new File(getTsFilePath()), database); } - public List<File> getTsFiles(boolean shouldWaitForTsFileClose) throws FileNotFoundException { - File tsFile = new File(getTsFilePath()).getAbsoluteFile(); - File resource = new File(getResourceFilePath()); - File mods = new File(getModsFilePath()); - - List<File> files = new ArrayList<>(); - if (!tsFile.exists()) { - throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath())); - } - files.add(tsFile); - if (resource.exists()) { - files.add(resource); - } else { - if (shouldWaitForTsFileClose && !waitForTsFileClose()) { - throw new FileNotFoundException( - String.format( - "Can not find %s, maybe the tsfile is not closed yet", resource.getAbsolutePath())); - } - } - if (mods.exists()) { - files.add(mods); - } - return files; - } - - private boolean waitForTsFileClose() { - for (int i = 0; i < SyncConstant.DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER; i++) { - if (isTsFileClosed()) { - return true; - } - try { - Thread.sleep(SyncConstant.DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS); - } catch (InterruptedException e) { - logger.warn(String.format("Be Interrupted when waiting for tsfile %s closed", tsFileName)); - } - logger.info( - String.format( - "Waiting for tsfile %s close, retry %d / %d.", - tsFileName, (i + 1), SyncConstant.DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER)); - } - return false; - } - - private boolean isTsFileClosed() { - File tsFile = new File(getTsFilePath()).getAbsoluteFile(); - File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); - return resource.exists(); - } - @Override public String toString() { return "TsFilePipeData{" diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/transport/SyncIdentityInfo.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/transport/SyncIdentityInfo.java deleted file mode 100644 index c1f62c0695c..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/transport/SyncIdentityInfo.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.pipe.connector.legacy.transport; - -import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; - -public class SyncIdentityInfo { - public String pipeName; - public long createTime; - public String version; - public String remoteAddress; - public String database; - - public SyncIdentityInfo(TSyncIdentityInfo identityInfo, String remoteAddress) { - this.pipeName = identityInfo.getPipeName(); - this.createTime = identityInfo.getCreateTime(); - this.version = identityInfo.getVersion(); - this.database = identityInfo.getDatabase(); - this.remoteAddress = remoteAddress; - } - - public String getPipeName() { - return pipeName; - } - - public long getCreateTime() { - return createTime; - } - - public String getVersion() { - return version; - } - - public String getRemoteAddress() { - return remoteAddress; - } - - public String getDatabase() { - return database; - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java deleted file mode 100644 index baa87f17f7e..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.pipe.connector.legacy.utils; - -public class SyncConstant { - /** common */ - public static final String FILE_DATA_DIR_NAME = "file-data"; - - public static final String PIPE_LOG_NAME_SEPARATOR = "_"; - public static final String PIPE_LOG_NAME_SUFFIX = PIPE_LOG_NAME_SEPARATOR + "pipe.log"; - - // data config - - public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L; - public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L; - - /** transport */ - public static final String PATCH_SUFFIX = ".patch"; - - /** receiver */ - public static final String RECEIVER_DIR_NAME = "receiver"; -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java deleted file mode 100644 index 39b334c51ca..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.connector.legacy.utils; - -import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.pipe.connector.legacy.transport.SyncIdentityInfo; - -import java.io.File; - -/** Util for path generation in sync module */ -public class SyncPathUtil { - - private SyncPathUtil() { - // forbidding instantiation - } - - // sync data structure - // data/sync - // |----sender dir - // | |----sender pipe dir - // | |----history pipe log dir - // | |----realtime pipe log dir - // | |----file data dir - // |----receiver dir - // | |-----receiver pipe dir - // | |----receiver pipe log dir - // | |----file data dir - // |----sys dir - - /** receiver */ - public static String getReceiverDir() { - return CommonDescriptor.getInstance().getConfig().getSyncDir() - + File.separator - + SyncConstant.RECEIVER_DIR_NAME; - } - - public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) { - return getReceiverDir() - + File.separator - + getReceiverPipeDirName(pipeName, remoteIp, createTime); - } - - public static String getReceiverPipeDirName(String pipeName, String remoteIp, long createTime) { - return String.format("%s-%d-%s", pipeName, createTime, remoteIp); - } - - public static String getReceiverFileDataDir(String pipeName, String remoteIp, long createTime) { - return getReceiverPipeDir(pipeName, remoteIp, createTime) - + File.separator - + SyncConstant.FILE_DATA_DIR_NAME; - } - - public static String getFileDataDirPath(SyncIdentityInfo identityInfo) { - return SyncPathUtil.getReceiverFileDataDir( - identityInfo.getPipeName(), identityInfo.getRemoteAddress(), identityInfo.getCreateTime()); - } -}
