This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch remove-sync-entry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f76c722e2e5e8ac971c0584850e9e1aa43584b3c Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 17 04:20:24 2023 +0800 refactor --- .../org/apache/iotdb/db/audit/AuditLogger.java | 2 +- .../config/constant/PipeConnectorConstant.java | 3 + .../pipe/connector/legacy/IoTDBSyncConnector.java | 20 +- .../pipe/connector/legacy/IoTDBSyncReceiver.java | 395 ++++++++----------- .../legacy/exception/SyncDataLoadException.java | 28 -- .../{pipedata/load => loader}/DeletionLoader.java | 16 +- .../legacy/{pipedata/load => loader}/ILoader.java | 2 +- .../{pipedata/load => loader}/TsFileLoader.java | 13 +- .../legacy/pipedata/DeletionPipeData.java | 4 +- .../pipe/connector/legacy/pipedata/PipeData.java | 11 +- .../connector/legacy/pipedata/TsFilePipeData.java | 4 +- .../pipedata/queue/BufferedPipeDataQueue.java | 426 --------------------- .../legacy/pipedata/queue/PipeDataQueue.java | 41 -- .../pipe/connector/legacy/utils/SyncConstant.java | 24 -- .../pipe/connector/legacy/utils/SyncPathUtil.java | 17 - 15 files changed, 205 insertions(+), 801 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java index ab7df92e591..aaeb3fde212 100644 --- a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java +++ b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; import java.util.List; -import static org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader.SCHEMA_FETCHER; +import static org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader.SCHEMA_FETCHER; public class AuditLogger { private static final Logger logger = LoggerFactory.getLogger(AuditLogger.class); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index d0b9002d2d4..24bd5a51e75 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -33,6 +33,9 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_PASSWORD_KEY = "connector.password"; public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version"; + public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1"; + private PipeConnectorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java index a7ebbecff51..b4f48208c72 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java @@ -61,6 +61,8 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; @@ -69,7 +71,6 @@ public class IoTDBSyncConnector implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncConnector.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1"; private String ipAddress; private int port; @@ -77,6 +78,8 @@ public class IoTDBSyncConnector implements PipeConnector { private String user; private String password; + private String syncConnectorVersion; + private String pipeName; private Long creationTime; @@ -94,15 +97,20 @@ public class IoTDBSyncConnector implements PipeConnector { @Override public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) throws Exception { - this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY); - this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY); + ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY); + port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY); - this.user = + user = parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE); - this.password = + password = parameters.getStringOrDefault( CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + syncConnectorVersion = + parameters.getStringOrDefault( + CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY, + CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE); + pipeName = configuration.getRuntimeEnvironment().getPipeName(); creationTime = configuration.getRuntimeEnvironment().getCreationTime(); } @@ -123,7 +131,7 @@ public class IoTDBSyncConnector implements PipeConnector { try { final TSyncIdentityInfo identityInfo = new TSyncIdentityInfo( - pipeName, creationTime, IOTDB_SYNC_CONNECTOR_VERSION, IoTDBConstant.PATH_ROOT); + pipeName, creationTime, syncConnectorVersion, IoTDBConstant.PATH_ROOT); final TSStatus status = client.handshake(identityInfo); if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { String errorMsg = diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java index 79480a4ea2c..5189a3635ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java @@ -22,20 +22,19 @@ package org.apache.iotdb.db.pipe.connector.legacy; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement; -import org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException; import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData; import org.apache.iotdb.db.pipe.connector.legacy.pipedata.TsFilePipeData; import org.apache.iotdb.db.pipe.connector.legacy.transport.SyncIdentityInfo; import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant; import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncPathUtil; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; @@ -50,9 +49,6 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -60,96 +56,38 @@ import java.util.concurrent.atomic.AtomicLong; /** This class is responsible for implementing the RPC processing on the receiver-side. */ public class IoTDBSyncReceiver { - public static IoTDBSyncReceiver getInstance() { - return IoTDBSyncReceiverHolder.INSTANCE; - } - - private static class IoTDBSyncReceiverHolder { - private static final IoTDBSyncReceiver INSTANCE = new IoTDBSyncReceiver(); - } - - private static final Logger logger = LoggerFactory.getLogger(IoTDBSyncReceiver.class); - - private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiver.class); // When the client abnormally exits, we can still know who to disconnect - private final ThreadLocal<Long> currentConnectionId; - // Record the remote message for every rpc connection - private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap; - // Record the remote message for every rpc connection - private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord; - private final Map<String, String> registeredDatabase; - - // The sync connectionId is unique in one IoTDB instance. - private final AtomicLong connectionIdGenerator; - - private IoTDBSyncReceiver() { - currentConnectionId = new ThreadLocal<>(); - connectionIdToIdentityInfoMap = new ConcurrentHashMap<>(); - connectionIdToStartIndexRecord = new ConcurrentHashMap<>(); - registeredDatabase = new ConcurrentHashMap<>(); - connectionIdGenerator = new AtomicLong(); - } + private final ThreadLocal<Long> currentConnectionId = new ThreadLocal<>(); - // region Interfaces and Implementation of Index Checker - - private class CheckResult { - boolean result; - String index; + // Record the remote message for every rpc connection + private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap = + new ConcurrentHashMap<>(); - public CheckResult(boolean result, String index) { - this.result = result; - this.index = index; - } + // Record the remote message for every rpc connection + private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord = + new ConcurrentHashMap<>(); - public boolean isResult() { - return result; - } + private final Map<String, String> registeredDatabase = new ConcurrentHashMap<>(); - public String getIndex() { - return index; - } - } + // The sync connectionId is unique in one IoTDB instance. + private final AtomicLong connectionIdGenerator = new AtomicLong(); - private CheckResult checkStartIndexValid(File file, long startIndex) throws IOException { - // get local index from memory map - long localIndex = getCurrentFileStartIndex(file.getAbsolutePath()); - // get local index from file - if (localIndex < 0 && file.exists()) { - localIndex = file.length(); - recordStartIndex(file, localIndex); - } - // compare and check - if (localIndex < 0 && startIndex != 0) { - logger.error( - "The start index {} of data sync is not valid. " - + "The file is not exist and start index should equal to 0).", - startIndex); - return new CheckResult(false, "0"); - } else if (localIndex >= 0 && localIndex != startIndex) { - logger.error( - "The start index {} of data sync is not valid. " - + "The start index of the file should equal to {}.", - startIndex, - localIndex); - return new CheckResult(false, String.valueOf(localIndex)); - } - return new CheckResult(true, "0"); - } + ////////////// Interfaces and Implementation of RPC Handler //////////////////////// - private void recordStartIndex(File file, long position) { - Long id = currentConnectionId.get(); - if (id != null) { - Map<String, Long> map = - connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new ConcurrentHashMap<>()); - map.put(file.getAbsolutePath(), position); + /** + * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally). + */ + public void handleClientExit() { + if (currentConnectionId.get() != null) { + long id = currentConnectionId.get(); + connectionIdToIdentityInfoMap.remove(id); + connectionIdToStartIndexRecord.remove(id); + currentConnectionId.remove(); } } - // endregion - - // region Interfaces and Implementation of RPC Handler - /** * Create connection from sender * @@ -162,15 +100,7 @@ public class IoTDBSyncReceiver { IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { SyncIdentityInfo identityInfo = new SyncIdentityInfo(tIdentityInfo, remoteAddress); - logger.info("Invoke handshake method from client ip = {}", identityInfo.getRemoteAddress()); - // Version check - if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) { - return RpcUtils.getStatus( - TSStatusCode.PIPESERVER_ERROR, - String.format( - "version mismatch: the sender <%s>, the receiver <%s>", - identityInfo.version, config.getIoTDBVersion())); - } + LOGGER.info("Invoke handshake method from client ip = {}", identityInfo.getRemoteAddress()); if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) { new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs(); @@ -186,44 +116,45 @@ public class IoTDBSyncReceiver { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } - /** - * Verify IP address with IP white list which contains more than one IP segment. It's used by sync - * sender. - */ - private boolean verifyIPSegment(String ipWhiteList, String ipAddress) { - String[] ipSegments = ipWhiteList.split(","); - for (String IPsegment : ipSegments) { - int subnetMask = Integer.parseInt(IPsegment.substring(IPsegment.indexOf('/') + 1)); - IPsegment = IPsegment.substring(0, IPsegment.indexOf('/')); - if (verifyIP(IPsegment, ipAddress, subnetMask)) { - return true; - } - } - return false; + private void createConnection(SyncIdentityInfo identityInfo) { + long connectionId = connectionIdGenerator.incrementAndGet(); + currentConnectionId.set(connectionId); + connectionIdToIdentityInfoMap.put(connectionId, identityInfo); } - /** Verify IP address with IP segment. */ - private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) { - String ipSegmentBinary; - String ipAddressBinary; - String[] ipSplits = ipSegment.split(SyncConstant.IP_SEPARATOR); - DecimalFormat df = new DecimalFormat("00000000"); - StringBuilder ipSegmentBuilder = new StringBuilder(); - for (String IPsplit : ipSplits) { - ipSegmentBuilder.append( - df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit))))); + private boolean registerDatabase( + String database, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { + if (registeredDatabase.containsKey(database)) { + return true; } - ipSegmentBinary = ipSegmentBuilder.toString(); - ipSegmentBinary = ipSegmentBinary.substring(0, subnetMark); - ipSplits = ipAddress.split(SyncConstant.IP_SEPARATOR); - StringBuilder ipAddressBuilder = new StringBuilder(); - for (String IPsplit : ipSplits) { - ipAddressBuilder.append( - df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit))))); + try { + DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(new PartialPath(database)); + long queryId = SessionManager.getInstance().requestQueryId(); + ExecutionResult result = + Coordinator.getInstance() + .execute( + statement, + queryId, + null, + "", + partitionFetcher, + schemaFetcher, + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + LOGGER.error("Create Database error, statement: {}.", statement); + LOGGER.error("Create database result status : {}.", result.status); + return false; + } + } catch (IllegalPathException e) { + LOGGER.error(String.format("Parse database PartialPath %s error", database), e); + return false; } - ipAddressBinary = ipAddressBuilder.toString(); - ipAddressBinary = ipAddressBinary.substring(0, subnetMark); - return ipAddressBinary.equals(ipSegmentBinary); + + registeredDatabase.put(database, ""); + return true; } /** @@ -240,7 +171,7 @@ public class IoTDBSyncReceiver { if (identityInfo == null) { throw new TException("Thrift connection is not alive."); } - logger.debug( + LOGGER.debug( "Invoke transportPipeData method from client ip = {}", identityInfo.getRemoteAddress()); String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo); @@ -257,23 +188,23 @@ public class IoTDBSyncReceiver { handleTsFilePipeData(tsFilePipeData, fileDir); } } catch (IOException | IllegalPathException e) { - logger.error("Pipe data transport error, {}", e.getMessage()); + LOGGER.error("Pipe data transport error, {}", e.getMessage()); return RpcUtils.getStatus( TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + e.getMessage()); } // step3. load PipeData - logger.info( + LOGGER.info( "Start load pipeData with serialize number {} and type {},value={}", pipeData.getSerialNumber(), pipeData.getPipeDataType(), pipeData); try { pipeData.createLoader().load(); - logger.info( + LOGGER.info( "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber()); - } catch (SyncDataLoadException e) { - logger.error("Fail to load pipeData because {}.", e.getMessage()); + } catch (PipeException e) { + LOGGER.error("Fail to load pipeData because {}.", e.getMessage()); return RpcUtils.getStatus( TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + e.getMessage()); } @@ -281,6 +212,49 @@ public class IoTDBSyncReceiver { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } + /** + * Get current SyncIdentityInfo + * + * @return null if connection has been exited + */ + private SyncIdentityInfo getCurrentSyncIdentityInfo() { + Long id = currentConnectionId.get(); + if (id != null) { + return connectionIdToIdentityInfoMap.get(id); + } else { + return null; + } + } + + /** + * handle when successfully receive tsFilePipeData. Rename .patch file and reset tsFilePipeData's + * path. + * + * @param tsFilePipeData pipeData + * @param fileDir path of file data dir + */ + private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String fileDir) { + String tsFileName = tsFilePipeData.getTsFileName(); + File dir = new File(fileDir); + File[] targetFiles = + dir.listFiles( + (dir1, name) -> + name.startsWith(tsFileName) && name.endsWith(SyncConstant.PATCH_SUFFIX)); + if (targetFiles != null) { + for (File targetFile : targetFiles) { + File newFile = + new File( + dir, + targetFile + .getName() + .substring( + 0, targetFile.getName().length() - SyncConstant.PATCH_SUFFIX.length())); + targetFile.renameTo(newFile); + } + } + tsFilePipeData.setParentDirPath(dir.getAbsolutePath()); + } + /** * Receive TsFile based on startIndex. * @@ -297,7 +271,7 @@ public class IoTDBSyncReceiver { if (identityInfo == null) { throw new TException("Thrift connection is not alive."); } - logger.debug( + LOGGER.debug( "Invoke transportData method from client ip = {}", identityInfo.getRemoteAddress()); String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo); @@ -312,7 +286,7 @@ public class IoTDBSyncReceiver { return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex()); } } catch (IOException e) { - logger.error(e.getMessage()); + LOGGER.error(e.getMessage()); return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage()); } @@ -324,7 +298,7 @@ public class IoTDBSyncReceiver { buff.get(byteArray); randomAccessFile.write(byteArray); recordStartIndex(new File(fileDir, fileName), startIndex + length); - logger.debug( + LOGGER.debug( "Sync " + fileName + " start at " @@ -333,63 +307,37 @@ public class IoTDBSyncReceiver { + (startIndex + length) + " is done."); } catch (IOException e) { - logger.error(e.getMessage()); + LOGGER.error(e.getMessage()); return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage()); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } - /** - * handle when successfully receive tsFilePipeData. Rename .patch file and reset tsFilePipeData's - * path. - * - * @param tsFilePipeData pipeData - * @param fileDir path of file data dir - */ - private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String fileDir) { - String tsFileName = tsFilePipeData.getTsFileName(); - File dir = new File(fileDir); - File[] targetFiles = - dir.listFiles( - (dir1, name) -> - name.startsWith(tsFileName) && name.endsWith(SyncConstant.PATCH_SUFFIX)); - if (targetFiles != null) { - for (File targetFile : targetFiles) { - File newFile = - new File( - dir, - targetFile - .getName() - .substring( - 0, targetFile.getName().length() - SyncConstant.PATCH_SUFFIX.length())); - targetFile.renameTo(newFile); - } + private CheckResult checkStartIndexValid(File file, long startIndex) throws IOException { + // get local index from memory map + long localIndex = getCurrentFileStartIndex(file.getAbsolutePath()); + // get local index from file + if (localIndex < 0 && file.exists()) { + localIndex = file.length(); + recordStartIndex(file, localIndex); } - tsFilePipeData.setParentDirPath(dir.getAbsolutePath()); - } - - // endregion - - // region Interfaces and Implementation of Connection Manager - - /** Check if the connection is legally established by handshaking */ - private boolean checkConnection() { - return currentConnectionId.get() != null; - } - - /** - * Get current SyncIdentityInfo - * - * @return null if connection has been exited - */ - private SyncIdentityInfo getCurrentSyncIdentityInfo() { - Long id = currentConnectionId.get(); - if (id != null) { - return connectionIdToIdentityInfoMap.get(id); - } else { - return null; + // compare and check + if (localIndex < 0 && startIndex != 0) { + LOGGER.error( + "The start index {} of data sync is not valid. " + + "The file is not exist and start index should equal to 0).", + startIndex); + return new CheckResult(false, "0"); + } else if (localIndex >= 0 && localIndex != startIndex) { + LOGGER.error( + "The start index {} of data sync is not valid. " + + "The start index of the file should equal to {}.", + startIndex, + localIndex); + return new CheckResult(false, String.valueOf(localIndex)); } + return new CheckResult(true, "0"); } /** @@ -408,62 +356,43 @@ public class IoTDBSyncReceiver { return -1; } - private void createConnection(SyncIdentityInfo identityInfo) { - long connectionId = connectionIdGenerator.incrementAndGet(); - currentConnectionId.set(connectionId); - connectionIdToIdentityInfoMap.put(connectionId, identityInfo); + private void recordStartIndex(File file, long position) { + Long id = currentConnectionId.get(); + if (id != null) { + Map<String, Long> map = + connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new ConcurrentHashMap<>()); + map.put(file.getAbsolutePath(), position); + } } - private boolean registerDatabase( - String database, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { - if (registeredDatabase.containsKey(database)) { - return true; - } - try { - DatabaseSchemaStatement statement = - new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); - statement.setDatabasePath(new PartialPath(database)); - long queryId = SessionManager.getInstance().requestQueryId(); - ExecutionResult result = - Coordinator.getInstance() - .execute( - statement, - queryId, - null, - "", - partitionFetcher, - schemaFetcher, - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); - if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - logger.error("Create Database error, statement: {}.", statement); - logger.error("Create database result status : {}.", result.status); - return false; - } - } catch (IllegalPathException e) { - logger.error(String.format("Parse database PartialPath %s error", database), e); - return false; + private static class CheckResult { + + private final boolean result; + private final String index; + + public CheckResult(boolean result, String index) { + this.result = result; + this.index = index; } - registeredDatabase.put(database, ""); - return true; - } + public boolean isResult() { + return result; + } - /** - * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally). - */ - public void handleClientExit() { - if (checkConnection()) { - long id = currentConnectionId.get(); - connectionIdToIdentityInfoMap.remove(id); - connectionIdToStartIndexRecord.remove(id); - currentConnectionId.remove(); + public String getIndex() { + return index; } } - public List<SyncIdentityInfo> getAllTSyncIdentityInfos() { - return new ArrayList<>(connectionIdToIdentityInfoMap.values()); + ///////////////////////// Singleton ///////////////////////// + + private IoTDBSyncReceiver() {} + + public static IoTDBSyncReceiver getInstance() { + return IoTDBSyncReceiverHolder.INSTANCE; } - // endregion + private static class IoTDBSyncReceiverHolder { + private static final IoTDBSyncReceiver INSTANCE = new IoTDBSyncReceiver(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/exception/SyncDataLoadException.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/exception/SyncDataLoadException.java deleted file mode 100644 index 0fffb488a4f..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/exception/SyncDataLoadException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.pipe.connector.legacy.exception; - -import org.apache.iotdb.pipe.api.exception.PipeException; - -public class SyncDataLoadException extends PipeException { - - public SyncDataLoadException(String message) { - super(message); - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java similarity index 87% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/DeletionLoader.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java index e60a77dee5b..8445a6fdcd6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/DeletionLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.pipe.connector.legacy.pipedata.load; +package org.apache.iotdb.db.pipe.connector.legacy.loader; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement; import org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -38,9 +39,10 @@ import java.util.Collections; /** This loader is used to load deletion plan. */ public class DeletionLoader implements ILoader { - private static final Logger logger = LoggerFactory.getLogger(DeletionLoader.class); - private Deletion deletion; + private static final Logger LOGGER = LoggerFactory.getLogger(DeletionLoader.class); + + private final Deletion deletion; public DeletionLoader(Deletion deletion) { this.deletion = deletion; @@ -49,7 +51,7 @@ public class DeletionLoader implements ILoader { @Override public void load() throws SyncDataLoadException { if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { - throw new SyncDataLoadException("storage engine readonly"); + throw new PipeException("storage engine readonly"); } try { Statement statement = generateStatement(); @@ -65,13 +67,13 @@ public class DeletionLoader implements ILoader { SCHEMA_FETCHER, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - logger.error("Delete {} error, statement: {}.", deletion, statement); - logger.error("Delete result status : {}.", result.status); + LOGGER.error("Delete {} error, statement: {}.", deletion, statement); + LOGGER.error("Delete result status : {}.", result.status); throw new LoadFileException( String.format("Can not execute delete statement: %s", statement)); } } catch (Exception e) { - throw new SyncDataLoadException(e.getMessage()); + throw new PipeException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/ILoader.java similarity index 95% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/ILoader.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/ILoader.java index fe2518beb04..0639dd50ead 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/ILoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/ILoader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.pipe.connector.legacy.pipedata.load; +package org.apache.iotdb.db.pipe.connector.legacy.loader; import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/TsFileLoader.java similarity index 89% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/TsFileLoader.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/TsFileLoader.java index 0609dc3056d..6d0be5ab785 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/TsFileLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/TsFileLoader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.pipe.connector.legacy.pipedata.load; +package org.apache.iotdb.db.pipe.connector.legacy.loader; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -36,7 +37,8 @@ import java.io.File; /** This loader is used to load tsFiles. If .mods file exists, it will be loaded as well. */ public class TsFileLoader implements ILoader { - private static final Logger logger = LoggerFactory.getLogger(TsFileLoader.class); + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileLoader.class); private final File tsFile; private final String database; @@ -49,7 +51,6 @@ public class TsFileLoader implements ILoader { @Override public void load() throws SyncDataLoadException { try { - LoadTsFileStatement statement = new LoadTsFileStatement(tsFile.getAbsolutePath()); statement.setDeleteAfterLoad(true); statement.setSgLevel(parseSgLevel()); @@ -68,13 +69,13 @@ public class TsFileLoader implements ILoader { SCHEMA_FETCHER, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - logger.error("Load TsFile {} error, statement: {}.", tsFile.getPath(), statement); - logger.error("Load TsFile result status : {}.", result.status); + LOGGER.error("Load TsFile {} error, statement: {}.", tsFile.getPath(), statement); + LOGGER.error("Load TsFile result status : {}.", result.status); throw new LoadFileException( String.format("Can not execute load TsFile statement: %s", statement)); } } catch (Exception e) { - throw new SyncDataLoadException(e.getMessage()); + throw new PipeException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java index b7162c5e00f..96fb8601dfd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java @@ -21,8 +21,8 @@ package org.apache.iotdb.db.pipe.connector.legacy.pipedata; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.engine.modification.Deletion; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.DeletionLoader; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader; +import org.apache.iotdb.db.pipe.connector.legacy.loader.DeletionLoader; +import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java index dae6141019d..659c69876e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.pipe.connector.legacy.pipedata; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader; +import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,8 @@ import java.io.DataOutputStream; import java.io.IOException; public abstract class PipeData { - private static final Logger logger = LoggerFactory.getLogger(PipeData.class); + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeData.class); protected long serialNumber; @@ -46,10 +47,6 @@ public abstract class PipeData { return serialNumber; } - public void setSerialNumber(long serialNumber) { - this.serialNumber = serialNumber; - } - public abstract PipeDataType getPipeDataType(); public long serialize(DataOutputStream stream) throws IOException { @@ -83,7 +80,7 @@ public abstract class PipeData { pipeData = new DeletionPipeData(); break; default: - logger.error("Deserialize PipeData error because Unknown type {}.", type); + LOGGER.error("Deserialize PipeData error because Unknown type {}.", type); throw new UnsupportedOperationException( "Deserialize PipeData error because Unknown type " + type); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java index 60882c74dd7..1f2d93d3879 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java @@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.connector.legacy.pipedata; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.TsFileLoader; +import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader; +import org.apache.iotdb.db.pipe.connector.legacy.loader.TsFileLoader; import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/BufferedPipeDataQueue.java deleted file mode 100644 index 495a6118c02..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/BufferedPipeDataQueue.java +++ /dev/null @@ -1,426 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.pipe.connector.legacy.pipedata.queue; - -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.utils.FileUtils; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData; -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.TsFilePipeData; -import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant; -import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncPathUtil; -import org.apache.iotdb.tsfile.exception.NotImplementedException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; - -public class BufferedPipeDataQueue implements PipeDataQueue { - private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueue.class); - - private final String pipeLogDir; - - /** input */ - private long lastMaxSerialNumber; - - private BlockingDeque<PipeData> inputDeque; - - private BlockingDeque<Long> pipeLogStartNumber; - private DataOutputStream outputStream; - private long currentPipeLogSize; - - /** output */ - private final Object waitLock = new Object(); - - private BlockingDeque<PipeData> outputDeque; - - private long pullSerialNumber; - private long commitSerialNumber; - private DataOutputStream commitLogWriter; - private long currentCommitLogSize; - - public BufferedPipeDataQueue(String pipeLogDir) { - this.pipeLogDir = pipeLogDir; - - this.lastMaxSerialNumber = 0; - this.pipeLogStartNumber = new LinkedBlockingDeque<>(); - - this.outputDeque = new LinkedBlockingDeque<>(); - this.pullSerialNumber = Long.MIN_VALUE; - this.commitSerialNumber = Long.MIN_VALUE; - - recover(); - } - - /** recover */ - private void recover() { - if (!new File(pipeLogDir).exists()) { - return; - } - - recoverPipeLogStartNumber(); - recoverLastMaxSerialNumber(); - recoverCommitSerialNumber(); - recoverOutputDeque(); - } - - private void recoverPipeLogStartNumber() { - File logDir = new File(pipeLogDir); - List<Long> startNumbers = new ArrayList<>(); - - for (File file : logDir.listFiles()) - if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX) && file.length() > 0) { - startNumbers.add(SyncPathUtil.getSerialNumberFromPipeLogName(file.getName())); - } - if (!startNumbers.isEmpty()) { - Collections.sort(startNumbers); - for (Long startTime : startNumbers) { - pipeLogStartNumber.offer(startTime); - } - } - } - - private void recoverLastMaxSerialNumber() { - if (pipeLogStartNumber.isEmpty()) { - return; - } - - File writingPipeLog = - new File(pipeLogDir, SyncPathUtil.getPipeLogName(pipeLogStartNumber.peekLast())); - try { - List<PipeData> recoverPipeData = parsePipeLog(writingPipeLog); - int recoverPipeDataSize = recoverPipeData.size(); - lastMaxSerialNumber = - recoverPipeDataSize == 0 - ? pipeLogStartNumber.peekLast() - 1 - : recoverPipeData.get(recoverPipeDataSize - 1).getSerialNumber(); - } catch (IOException e) { - logger.error( - String.format("Can not recover inputQueue from %s.", writingPipeLog.getPath()), e); - } - } - - private void recoverCommitSerialNumber() { - File commitLog = new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME); - if (!commitLog.exists()) { - if (!pipeLogStartNumber.isEmpty()) { - commitSerialNumber = pipeLogStartNumber.peek() - 1; - } - return; - } - - try (RandomAccessFile raf = new RandomAccessFile(commitLog, "r")) { - if (raf.length() >= Long.BYTES) { - raf.seek(raf.length() - Long.BYTES); - commitSerialNumber = raf.readLong(); - } - } catch (IOException e) { - logger.error( - String.format( - "deserialize remove serial number error, remove serial number has been set to %d.", - commitSerialNumber), - e); - } - } - - private void recoverOutputDeque() { - if (pipeLogStartNumber.isEmpty()) { - return; - } - - File readingPipeLog = - new File(pipeLogDir, SyncPathUtil.getPipeLogName(pipeLogStartNumber.peek())); - try { - List<PipeData> recoverPipeData = parsePipeLog(readingPipeLog); - int recoverPipeDataSize = recoverPipeData.size(); - for (int i = recoverPipeDataSize - 1; i >= 0; --i) { - PipeData pipeData = recoverPipeData.get(i); - if (pipeData.getSerialNumber() <= commitSerialNumber) { - break; - } - outputDeque.addFirst(pipeData); - } - } catch (IOException e) { - logger.error( - String.format("Recover output deque from pipe log %s error.", readingPipeLog.getPath()), - e); - } - } - - public long getLastMaxSerialNumber() { - return lastMaxSerialNumber; - } - - public long getCommitSerialNumber() { - return commitSerialNumber; - } - - /** input */ - @Override - public boolean offer(PipeData pipeData) { - if (outputStream == null || currentPipeLogSize > SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) { - try { - moveToNextPipeLog(pipeData.getSerialNumber()); - } catch (IOException e) { - logger.error(String.format("Move to next pipe log %s error.", pipeData), e); - } - } - synchronized (waitLock) { - if (!inputDeque.offer(pipeData)) { - waitLock.notifyAll(); - return false; - } - waitLock.notifyAll(); - } - - try { - writeToDisk(pipeData); - } catch (IOException e) { - logger.error(String.format("Record pipe data %s error.", pipeData), e); - return false; - } - return true; - } - - private synchronized void moveToNextPipeLog(long startSerialNumber) throws IOException { - if (outputStream != null) { - outputStream.close(); - } - File newPipeLog = new File(pipeLogDir, SyncPathUtil.getPipeLogName(startSerialNumber)); - SyncPathUtil.createFile(newPipeLog); - - outputStream = new DataOutputStream(new FileOutputStream(newPipeLog)); - pipeLogStartNumber.offer(startSerialNumber); - currentPipeLogSize = 0; - - inputDeque = new LinkedBlockingDeque<>(); - if (commitSerialNumber == Long.MIN_VALUE) { - commitSerialNumber = startSerialNumber - 1; - } - } - - private void writeToDisk(PipeData pipeData) throws IOException { - // skip trick - - currentPipeLogSize += pipeData.serialize(outputStream); - outputStream.flush(); - } - - /** output */ - private synchronized PipeData pullOnePipeData(long lastSerialNumber) throws IOException { - long serialNumber = lastSerialNumber + 1; - if (!outputDeque.isEmpty()) { - return outputDeque.poll(); - } else if (outputDeque != inputDeque) { - if (pipeLogStartNumber.isEmpty() || lastSerialNumber == Long.MIN_VALUE) { - return null; - } - - if (serialNumber > pipeLogStartNumber.peekLast()) { - return null; - } else if (serialNumber == pipeLogStartNumber.peekLast() && inputDeque != null) { - outputDeque = inputDeque; - } else { - long nextStartNumber = - pipeLogStartNumber.stream().filter(o -> o >= serialNumber).findFirst().get(); - List<PipeData> parsePipeData = - parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(nextStartNumber))); - int parsePipeDataSize = parsePipeData.size(); - outputDeque = new LinkedBlockingDeque<>(); - for (int i = 0; i < parsePipeDataSize; i++) { - outputDeque.offer(parsePipeData.get(i)); - } - } - return outputDeque.poll(); - } - return null; - } - - @Override - public List<PipeData> pull(long serialNumber) { - throw new NotImplementedException("Not implement pull"); - } - - @Override - public PipeData take() throws InterruptedException { - PipeData pipeData = null; - try { - synchronized (waitLock) { - while ((pipeData = pullOnePipeData(commitSerialNumber)) == null) { - waitLock.wait(); - waitLock.notifyAll(); - } - } - } catch (IOException e) { - logger.error( - String.format("Blocking pull pipe data number %s error.", commitSerialNumber + 1), e); - } - outputDeque.addFirst(pipeData); - pullSerialNumber = pipeData.getSerialNumber(); - return pipeData; - } - - @Override - public void commit() { - if (pullSerialNumber == Long.MIN_VALUE) { - return; - } - commit(pullSerialNumber); - } - - @Override - public void commit(long serialNumber) { - deletePipeData(serialNumber); - deletePipeLog(); - serializeCommitSerialNumber(); - } - - private void deletePipeData(long serialNumber) { - while (commitSerialNumber < serialNumber) { - PipeData commitData = null; - try { - commitData = pullOnePipeData(commitSerialNumber); - if (commitData == null) { - return; - } - if (PipeData.PipeDataType.TSFILE.equals(commitData.getPipeDataType())) { - List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles(false); - for (File file : tsFiles) { - Files.deleteIfExists(file.toPath()); - } - } - } catch (IOException e) { - logger.error( - String.format("Commit pipe data serial number %s error.", commitSerialNumber), e); - } - if (commitData != null) { - commitSerialNumber = commitData.getSerialNumber(); - } - } - } - - private void deletePipeLog() { - if (pipeLogStartNumber.size() >= 2) { - long nowPipeLogStartNumber; - while (true) { - nowPipeLogStartNumber = pipeLogStartNumber.poll(); - if (!pipeLogStartNumber.isEmpty() && pipeLogStartNumber.peek() <= commitSerialNumber) { - try { - Files.deleteIfExists( - new File(pipeLogDir, SyncPathUtil.getPipeLogName(nowPipeLogStartNumber)).toPath()); - } catch (IOException e) { - logger.warn(String.format("Delete %s-pipe.log error.", nowPipeLogStartNumber), e); - } - } else { - break; - } - } - pipeLogStartNumber.addFirst(nowPipeLogStartNumber); - } - } - - private void serializeCommitSerialNumber() { - try { - if (commitLogWriter == null) { - commitLogWriter = - new DataOutputStream( - new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME))); - currentCommitLogSize = 0; - } - commitLogWriter.writeLong(commitSerialNumber); - commitLogWriter.flush(); - currentCommitLogSize += Long.BYTES; - if (currentCommitLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) { - commitLogWriter.close(); - commitLogWriter = null; - } - } catch (IOException e) { - logger.error( - String.format("Serialize commit serial number %s error.", commitSerialNumber), e); - } - } - - /** common */ - @Override - public synchronized boolean isEmpty() { - if (outputDeque == null || pipeLogStartNumber.isEmpty()) { - return true; - } - return pipeLogStartNumber.size() == 1 - && outputDeque.isEmpty() - && (inputDeque == null || inputDeque.isEmpty()); - } - - @Override - public void close() { - try { - if (outputStream != null) { - outputStream.close(); - outputStream = null; - } - if (commitLogWriter != null) { - commitLogWriter.close(); - commitLogWriter = null; - } - - inputDeque = null; - pipeLogStartNumber = null; - outputDeque = null; - } catch (IOException e) { - logger.warn(String.format("Close pipe log dir %s error.", pipeLogDir), e); - } - } - - @Override - public void clear() { - close(); - - File logDir = new File(pipeLogDir); - if (logDir.exists()) { - FileUtils.deleteDirectory(logDir); - } - } - - public static List<PipeData> parsePipeLog(File file) throws IOException { - List<PipeData> pipeData = new ArrayList<>(); - try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) { - while (true) { - pipeData.add(PipeData.createPipeData(inputStream)); - } - } catch (EOFException e) { - } catch (IllegalPathException e) { - logger.error(String.format("Parsing pipeLog %s error.", file.getPath()), e); - throw new IOException(e); - } - return pipeData; - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/PipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/PipeDataQueue.java deleted file mode 100644 index 08ecfe0f3ce..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/PipeDataQueue.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.pipe.connector.legacy.pipedata.queue; - -import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData; - -import java.util.List; - -public interface PipeDataQueue { - boolean offer(PipeData data); - - List<PipeData> pull(long serialNumber); - - PipeData take() throws InterruptedException; - - void commit(); - - void commit(long serialNumber); - - boolean isEmpty(); - - void close(); - - void clear(); -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java index cfac1eec568..baa87f17f7e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java @@ -20,31 +20,12 @@ package org.apache.iotdb.db.pipe.connector.legacy.utils; public class SyncConstant { /** common */ - public static final String SYNC_SYS_DIR = "sys"; - public static final String FILE_DATA_DIR_NAME = "file-data"; - // pipe log: serialNumber + SEPARATOR + SUFFIX - public static final String PIPE_LOG_DIR_NAME = "pipe-log"; public static final String PIPE_LOG_NAME_SEPARATOR = "_"; public static final String PIPE_LOG_NAME_SUFFIX = PIPE_LOG_NAME_SEPARATOR + "pipe.log"; - public static final String COMMIT_LOG_NAME = "commit.log"; - public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L; - - // persistence - - public static final String SYNC_LOG_NAME = "syncService.log"; - - /** sender */ - - // dir structure - public static final String SENDER_DIR_NAME = "sender"; - - public static final String HISTORY_PIPE_LOG_DIR_NAME = "history-" + PIPE_LOG_DIR_NAME; // data config - public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1"; - public static final int DEFAULT_PIPE_SINK_PORT = 6667; public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L; public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L; @@ -52,11 +33,6 @@ public class SyncConstant { /** transport */ public static final String PATCH_SUFFIX = ".patch"; - public static final String IPV4_PATTERN = - "^([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}$"; - /** receiver */ public static final String RECEIVER_DIR_NAME = "receiver"; - - public static final String IP_SEPARATOR = "\\."; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java index 057ca641930..39b334c51ca 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.pipe.connector.legacy.transport.SyncIdentityInfo; import java.io.File; -import java.io.IOException; /** Util for path generation in sync module */ public class SyncPathUtil { @@ -72,20 +71,4 @@ public class SyncPathUtil { return SyncPathUtil.getReceiverFileDataDir( identityInfo.getPipeName(), identityInfo.getRemoteAddress(), identityInfo.getCreateTime()); } - - /** common */ - public static String getPipeLogName(long serialNumber) { - return serialNumber + SyncConstant.PIPE_LOG_NAME_SUFFIX; - } - - public static Long getSerialNumberFromPipeLogName(String pipeLogName) { - return Long.parseLong(pipeLogName.split(SyncConstant.PIPE_LOG_NAME_SEPARATOR)[0]); - } - - public static boolean createFile(File file) throws IOException { - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - return file.createNewFile(); - } }
