This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch remove-sync-entry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fc06d7750bf2bf24f9c7d85549d86176007a490c Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 17 03:34:56 2023 +0800 refactor --- .../request/write/sync/PreCreatePipePlanV1.java | 2 +- .../request/write/sync/RecordPipeMessagePlan.java | 2 +- .../request/write/sync/SetPipeStatusPlanV1.java | 2 +- .../procedure/impl/sync/CreatePipeProcedure.java | 3 +- .../procedure/impl/sync/DropPipeProcedure.java | 1 + .../procedure/impl/sync/StartPipeProcedure.java | 3 +- .../procedure/impl/sync/StopPipeProcedure.java | 3 +- .../confignode/procedure/store/ProcedureType.java | 2 +- .../request/ConfigPhysicalPlanSerDeTest.java | 8 +- .../exception/sync/PipeAlreadyExistException.java | 31 --- .../commons/exception/sync/PipeException.java | 36 --- .../exception/sync/PipeNotExistException.java | 25 --- .../exception/sync/PipeServerException.java | 34 --- .../sync/PipeSinkAlreadyExistException.java | 25 --- .../exception/sync/PipeSinkBeingUsedException.java | 27 --- .../commons/exception/sync/PipeSinkException.java | 42 ---- .../exception/sync/PipeSinkNotExistException.java | 25 --- .../exception/sync/SyncConnectionException.java | 39 ---- .../exception/sync/SyncHandshakeException.java | 26 --- .../iotdb/commons/sync/{pipe => }/PipeInfo.java | 46 +--- .../iotdb/commons/sync/{pipe => }/PipeMessage.java | 5 +- .../iotdb/commons/sync/{pipe => }/PipeStatus.java | 3 +- .../commons/sync/{pipe => }/TsFilePipeInfo.java | 36 +-- .../iotdb/commons/sync/metadata/SyncMetadata.java | 241 --------------------- .../commons/sync/persistence/SyncLogReader.java | 111 ---------- .../commons/sync/persistence/SyncLogWriter.java | 94 -------- .../iotdb/commons/sync/pipe/SyncOperation.java | 29 --- .../iotdb/commons/sync/pipesink/IoTDBPipeSink.java | 166 -------------- .../iotdb/commons/sync/pipesink/PipeSink.java | 88 -------- .../commons/sync/metedata/SyncMetadataTest.java | 189 ---------------- .../connector/lagacy/IoTDBSyncReceiverV1_1.java | 10 +- .../lagacy/exception/SyncDataLoadException.java | 9 +- .../connector/lagacy/pipedata/TsFilePipeData.java | 2 +- .../lagacy/pipedata/load/DeletionLoader.java | 8 +- .../connector/lagacy/pipedata/load/ILoader.java | 4 +- .../lagacy/pipedata/load/TsFileLoader.java | 6 +- .../pipedata/queue/BufferedPipeDataQueue.java | 4 +- .../lagacy}/transport/SyncIdentityInfo.java | 2 +- .../pipe/connector/lagacy}/utils/SyncConstant.java | 31 +-- .../pipe/connector/lagacy}/utils/SyncPathUtil.java | 69 +----- 40 files changed, 53 insertions(+), 1436 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java index 24c3125fc11..849f4947ac6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.confignode.consensus.request.write.sync; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; +import org.apache.iotdb.commons.sync.PipeInfo; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java index 3fe6b8c8fae..da5c542cf50 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.confignode.consensus.request.write.sync; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; +import org.apache.iotdb.commons.sync.PipeMessage; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java index 4aa0f9668e6..0f0472cc22f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java @@ -19,7 +19,7 @@ package org.apache.iotdb.confignode.consensus.request.write.sync; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; +import org.apache.iotdb.commons.sync.PipeStatus; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java index b1f22e57111..2e0f490222b 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.confignode.procedure.impl.sync; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; +import org.apache.iotdb.commons.sync.PipeInfo; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; @@ -35,6 +35,7 @@ import java.util.Objects; import java.util.Set; // Empty procedure for old sync, restored only for compatibility +@Deprecated public class CreatePipeProcedure extends Procedure<ConfigNodeProcedureEnv> { private PipeInfo pipeInfo; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java index aa61bc8949f..96d6bc9dcc6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.Objects; // Empty procedure for old sync, restored only for compatibility +@Deprecated public class DropPipeProcedure extends Procedure<ConfigNodeProcedureEnv> { private String pipeName; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java index 658d319cbd3..5d24aca8b92 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.confignode.procedure.impl.sync; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; +import org.apache.iotdb.commons.sync.PipeInfo; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; @@ -35,6 +35,7 @@ import java.util.Objects; import java.util.Set; // Empty procedure for old sync, used only for compatibility +@Deprecated public class StartPipeProcedure extends Procedure<ConfigNodeProcedureEnv> { private String pipeName; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java index 88f4d61edd8..c26caeeca9a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.confignode.procedure.impl.sync; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; +import org.apache.iotdb.commons.sync.PipeInfo; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; @@ -35,6 +35,7 @@ import java.util.Objects; import java.util.Set; // Empty procedure for old sync, used only for compatibility +@Deprecated public class StopPipeProcedure extends Procedure<ConfigNodeProcedureEnv> { private String pipeName; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 31bb30aa7c4..92a0a4d87b8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -44,7 +44,7 @@ public enum ProcedureType { CREATE_TRIGGER_PROCEDURE((short) 400), DROP_TRIGGER_PROCEDURE((short) 401), - /** Old sync */ + /** Legacy enum for sync */ CREATE_PIPE_PROCEDURE((short) 500), START_PIPE_PROCEDURE((short) 501), STOP_PIPE_PROCEDURE((short) 502), diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 9c3be2ac3a1..a60b8605d0e 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -45,10 +45,10 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; -import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo; +import org.apache.iotdb.commons.sync.PipeInfo; +import org.apache.iotdb.commons.sync.PipeMessage; +import org.apache.iotdb.commons.sync.PipeStatus; +import org.apache.iotdb.commons.sync.TsFilePipeInfo; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeAlreadyExistException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeAlreadyExistException.java deleted file mode 100644 index 0cfa5fec0d4..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeAlreadyExistException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -import org.apache.iotdb.commons.sync.pipe.PipeStatus; - -public class PipeAlreadyExistException extends PipeException { - public PipeAlreadyExistException(String pipeName) { - super(String.format("PIPE [%s] already exists in IoTDB.", pipeName)); - } - - public PipeAlreadyExistException(String pipeName, PipeStatus status) { - super(String.format("PIPE [%s] is %s, please retry after drop it.", pipeName, status.name())); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeException.java deleted file mode 100644 index 4c4347ebd2b..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.iotdb.commons.exception.sync; - -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.rpc.TSStatusCode; - -public class PipeException extends IoTDBException { - - private static final long serialVersionUID = -7312720445194413492L; - - public PipeException(String message, int errorCode) { - super(message, errorCode); - } - - public PipeException(String message) { - super(message, TSStatusCode.PIPE_ERROR.getStatusCode()); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java deleted file mode 100644 index 23a0637a516..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -public class PipeNotExistException extends PipeException { - public PipeNotExistException(String pipeName) { - super(String.format("PIPE [%s] does not exist", pipeName)); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeServerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeServerException.java deleted file mode 100644 index 45e2271539e..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeServerException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.iotdb.commons.exception.sync; - -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.rpc.TSStatusCode; - -public class PipeServerException extends IoTDBException { - - public PipeServerException(String message, int errorCode) { - super(message, errorCode); - } - - public PipeServerException(String message) { - this(message, TSStatusCode.PIPESERVER_ERROR.getStatusCode()); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkAlreadyExistException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkAlreadyExistException.java deleted file mode 100644 index 535a685b561..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkAlreadyExistException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -public class PipeSinkAlreadyExistException extends PipeSinkException { - public PipeSinkAlreadyExistException(String pipeSinkName) { - super(String.format("PIPESINK [%s] already exists in IoTDB.", pipeSinkName)); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkBeingUsedException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkBeingUsedException.java deleted file mode 100644 index 5ba3a29013b..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkBeingUsedException.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -public class PipeSinkBeingUsedException extends PipeSinkException { - public PipeSinkBeingUsedException(String pipeSinkName, String pipeName) { - super( - String.format( - "Can not drop PIPESINK [%s], because PIPE [%s] is using it.", pipeSinkName, pipeName)); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkException.java deleted file mode 100644 index f2711b0f793..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.iotdb.commons.exception.sync; - -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.rpc.TSStatusCode; - -public class PipeSinkException extends IoTDBException { - - private static final long serialVersionUID = -2355881952697245662L; - - public PipeSinkException(String message, int errorCode) { - super(message, errorCode); - } - - public PipeSinkException(String message) { - super(message, TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode()); - } - - public PipeSinkException(String attr, String value, String attrType) { - super( - String.format("%s=%s has wrong format, require for %s.", attr, value, attrType), - TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode()); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkNotExistException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkNotExistException.java deleted file mode 100644 index 31c30c3173a..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeSinkNotExistException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -public class PipeSinkNotExistException extends PipeSinkException { - public PipeSinkNotExistException(String pipeSinkName) { - super(String.format("PIPESINK [%s] does not exist", pipeSinkName)); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncConnectionException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncConnectionException.java deleted file mode 100644 index 7a88fa1a8ad..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncConnectionException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.rpc.TSStatusCode; - -public class SyncConnectionException extends IoTDBException { - - private static final long serialVersionUID = -6661904365503849681L; - - public SyncConnectionException(String message) { - super(message, TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); - } - - public SyncConnectionException(String message, Throwable cause) { - super(message + cause.getMessage(), TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); - } - - public SyncConnectionException(Throwable cause) { - super(cause.getMessage(), TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncHandshakeException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncHandshakeException.java deleted file mode 100644 index 61903d69ad5..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncHandshakeException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.exception.sync; - -public class SyncHandshakeException extends SyncConnectionException { - - public SyncHandshakeException(String message) { - super(message); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeInfo.java similarity index 79% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java rename to node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeInfo.java index ca579073546..88282635321 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeInfo.java @@ -16,10 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.sync.pipe; +package org.apache.iotdb.commons.sync; import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; -import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -32,6 +31,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +@Deprecated public abstract class PipeInfo { private static final Logger LOGGER = LoggerFactory.getLogger(PipeInfo.class); @@ -70,14 +70,6 @@ public abstract class PipeInfo { this.pipeName = pipeName; } - public String getPipeSinkName() { - return pipeSinkName; - } - - public void setPipeSinkName(String pipeSinkName) { - this.pipeSinkName = pipeSinkName; - } - public PipeStatus getStatus() { return status; } @@ -86,24 +78,6 @@ public abstract class PipeInfo { this.status = status; } - public PipeMessage.PipeMessageType getMessageType() { - return messageType; - } - - public void setMessageType(PipeMessage.PipeMessageType messageType) { - this.messageType = messageType; - } - - public long getCreateTime() { - return createTime; - } - - public void setCreateTime(long createTime) { - this.createTime = createTime; - } - - public abstract TShowPipeInfo getTShowPipeInfo(); - public void serialize(OutputStream outputStream) throws IOException { ReadWriteIOUtils.write((byte) getType().ordinal(), outputStream); ReadWriteIOUtils.write(pipeName, outputStream); @@ -140,22 +114,6 @@ public abstract class PipeInfo { } } - public static PipeInfo deserializePipeInfo(InputStream inputStream) throws IOException { - PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(inputStream)]; - PipeInfo pipeInfo; - switch (pipeType) { - case TsFilePipe: - pipeInfo = new TsFilePipeInfo(); - pipeInfo.deserialize(inputStream); - break; - case WALPipe: - default: - throw new UnsupportedOperationException( - String.format("Can not recognize PipeType %s.", pipeType.name())); - } - return pipeInfo; - } - public static PipeInfo deserializePipeInfo(ByteBuffer byteBuffer) { PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(byteBuffer)]; PipeInfo pipeInfo; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeMessage.java similarity index 97% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java rename to node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeMessage.java index b0f636fa56e..e386d751adb 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeMessage.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.sync.pipe; +package org.apache.iotdb.commons.sync; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -25,6 +25,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +@Deprecated public class PipeMessage { private final String message; private final PipeMessageType type; @@ -70,7 +71,7 @@ public class PipeMessage { WARN((byte) 2), ERROR((byte) 3); - private byte type; + private final byte type; PipeMessageType(byte type) { this.type = type; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeStatus.java similarity index 97% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java rename to node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeStatus.java index 64253b21d96..a10c571afb1 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/PipeStatus.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.sync.pipe; +package org.apache.iotdb.commons.sync; +@Deprecated public enum PipeStatus { // a new pipe should be stop status diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/TsFilePipeInfo.java similarity index 79% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java rename to node-commons/src/main/java/org/apache/iotdb/commons/sync/TsFilePipeInfo.java index a45c7579fc1..5a3c764e8e5 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/TsFilePipeInfo.java @@ -16,9 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.sync.pipe; +package org.apache.iotdb.commons.sync; -import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; @@ -27,6 +26,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; +@Deprecated public class TsFilePipeInfo extends PipeInfo { private boolean syncDelOp; private long dataStartTimestamp; @@ -34,17 +34,6 @@ public class TsFilePipeInfo extends PipeInfo { // only used for serialization public TsFilePipeInfo() {} - public TsFilePipeInfo( - String pipeName, - String pipeSinkName, - long createTime, - long dataStartTimestamp, - boolean syncDelOp) { - super(pipeName, pipeSinkName, createTime); - this.dataStartTimestamp = dataStartTimestamp; - this.syncDelOp = syncDelOp; - } - public TsFilePipeInfo( String pipeName, String pipeSinkName, @@ -57,32 +46,11 @@ public class TsFilePipeInfo extends PipeInfo { this.syncDelOp = syncDelOp; } - public boolean isSyncDelOp() { - return syncDelOp; - } - - public void setSyncDelOp(boolean syncDelOp) { - this.syncDelOp = syncDelOp; - } - - public long getDataStartTimestamp() { - return dataStartTimestamp; - } - - public void setDataStartTimestamp(long dataStartTimestamp) { - this.dataStartTimestamp = dataStartTimestamp; - } - @Override PipeType getType() { return PipeType.TsFilePipe; } - @Override - public TShowPipeInfo getTShowPipeInfo() { - return new TShowPipeInfo(); - } - @Override public void serialize(OutputStream outputStream) throws IOException { super.serialize(outputStream); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java deleted file mode 100644 index e9d9e0f9c8a..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.sync.metadata; - -import org.apache.iotdb.commons.exception.sync.PipeAlreadyExistException; -import org.apache.iotdb.commons.exception.sync.PipeException; -import org.apache.iotdb.commons.exception.sync.PipeNotExistException; -import org.apache.iotdb.commons.exception.sync.PipeSinkAlreadyExistException; -import org.apache.iotdb.commons.exception.sync.PipeSinkBeingUsedException; -import org.apache.iotdb.commons.exception.sync.PipeSinkException; -import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException; -import org.apache.iotdb.commons.snapshot.SnapshotProcessor; -import org.apache.iotdb.commons.sync.persistence.SyncLogReader; -import org.apache.iotdb.commons.sync.persistence.SyncLogWriter; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; -import org.apache.iotdb.commons.sync.pipe.SyncOperation; -import org.apache.iotdb.commons.sync.pipesink.PipeSink; -import org.apache.iotdb.commons.sync.utils.SyncConstant; - -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -public class SyncMetadata implements SnapshotProcessor { - - private static final Logger LOGGER = LoggerFactory.getLogger(SyncMetadata.class); - - // <PipeSinkName, PipeSink> - private Map<String, PipeSink> pipeSinks; - - // <PipeName, PipeInfo> - private Map<String, PipeInfo> pipes; - - public SyncMetadata() { - this.pipes = new ConcurrentHashMap<>(); - this.pipeSinks = new ConcurrentHashMap<>(); - } - - // ====================================================== - // region Implement of Getter and Setter - // ====================================================== - - public Map<String, PipeSink> getPipeSinks() { - return pipeSinks; - } - - public void setPipeSinks(Map<String, PipeSink> pipeSinks) { - this.pipeSinks = pipeSinks; - } - - public Map<String, PipeInfo> getPipes() { - return pipes; - } - - public void setPipes(Map<String, PipeInfo> pipes) { - this.pipes = pipes; - } - - // endregion - - // ====================================================== - // region Implement of PipeSink - // ====================================================== - - public boolean isPipeSinkExist(String name) { - return pipeSinks.containsKey(name); - } - - public void checkPipeSinkNoExist(String pipeSinkName) throws PipeSinkException { - if (isPipeSinkExist(pipeSinkName)) { - throw new PipeSinkAlreadyExistException(pipeSinkName); - } - } - - public void addPipeSink(PipeSink pipeSink) { - pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink); - } - - public void checkDropPipeSink(String pipeSinkName) throws PipeSinkException { - if (!isPipeSinkExist(pipeSinkName)) { - throw new PipeSinkNotExistException(pipeSinkName); - } - for (PipeInfo pipeInfo : pipes.values()) { - if (pipeInfo.getPipeSinkName().equals(pipeSinkName)) { - throw new PipeSinkBeingUsedException(pipeSinkName, pipeInfo.getPipeName()); - } - } - } - - public void dropPipeSink(String name) { - pipeSinks.remove(name); - } - - public PipeSink getPipeSink(String name) { - return pipeSinks.getOrDefault(name, null); - } - - public List<PipeSink> getAllPipeSink() { - List<PipeSink> allPipeSinks = new ArrayList<>(); - for (Map.Entry<String, PipeSink> entry : pipeSinks.entrySet()) { - allPipeSinks.add(entry.getValue()); - } - return allPipeSinks; - } - - // endregion - - // ====================================================== - // region Implement of Pipe - // ====================================================== - - public void checkAddPipe(PipeInfo pipeInfo) throws PipeException, PipeSinkNotExistException { - // check PipeSink exists - if (!isPipeSinkExist(pipeInfo.getPipeSinkName())) { - throw new PipeSinkNotExistException(pipeInfo.getPipeSinkName()); - } - // check Pipe does not exist - if (pipes.containsKey(pipeInfo.getPipeName())) { - PipeInfo runningPipe = pipes.get(pipeInfo.getPipeName()); - throw new PipeAlreadyExistException(runningPipe.getPipeName(), runningPipe.getStatus()); - } - } - - public void addPipe(PipeInfo pipeInfo) { - pipes.putIfAbsent(pipeInfo.getPipeName(), pipeInfo); - } - - public void dropPipe(String pipeName) { - pipes.remove(pipeName); - } - - public void setPipeStatus(String pipeName, PipeStatus status) { - pipes.get(pipeName).setStatus(status); - if (status.equals(PipeStatus.RUNNING)) { - pipes.get(pipeName).setMessageType(PipeMessage.PipeMessageType.NORMAL); - } - } - - public PipeInfo getPipeInfo(String pipeName) { - return pipes.get(pipeName); - } - - public List<PipeInfo> getAllPipeInfos() { - return new ArrayList<>(pipes.values()); - } - - public void checkIfPipeExist(String pipeName) throws PipeException { - if (!pipes.containsKey(pipeName)) { - throw new PipeNotExistException(pipeName); - } - } - - /** - * Change Pipe Message. It will record the most important message about one pipe. ERROR > WARN > - * NORMAL. - * - * @param pipeName name of pipe - * @param messageType pipe message type - */ - public void changePipeMessage(String pipeName, PipeMessage.PipeMessageType messageType) { - if (messageType.compareTo(pipes.get(pipeName).getMessageType()) > 0) { - pipes.get(pipeName).setMessageType(messageType); - } - } - - @Override - public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException { - File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME); - if (snapshotFile.exists() && snapshotFile.isFile()) { - LOGGER.error( - "Failed to take snapshot, because snapshot file [{}] is already exist.", - snapshotFile.getAbsolutePath()); - return false; - } - File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID()); - try (SyncLogWriter writer = new SyncLogWriter(snapshotDir, tmpFile.getName())) { - writer.initOutputStream(); - for (PipeSink pipeSink : pipeSinks.values()) { - writer.addPipeSink(pipeSink); - } - for (PipeInfo pipeInfo : pipes.values()) { - writer.addPipe(pipeInfo); - switch (pipeInfo.getStatus()) { - case RUNNING: - writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.START_PIPE); - break; - case STOP: - writer.operatePipe(pipeInfo.getPipeName(), SyncOperation.STOP_PIPE); - break; - default: - break; - } - } - } - return tmpFile.renameTo(snapshotFile); - } - - @Override - public void processLoadSnapshot(File snapshotDir) throws TException, IOException { - File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME); - if (!snapshotFile.exists() || !snapshotFile.isFile()) { - LOGGER.error( - "Failed to load snapshot,snapshot file [{}] is not exist.", - snapshotFile.getAbsolutePath()); - return; - } - SyncLogReader reader = new SyncLogReader(snapshotDir, snapshotFile.getName()); - reader.recover(); - setPipes(reader.getPipes()); - setPipeSinks(reader.getAllPipeSinks()); - } - - // endregion - -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java deleted file mode 100644 index 3ddd1f82f52..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.sync.persistence; - -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; -import org.apache.iotdb.commons.sync.pipe.SyncOperation; -import org.apache.iotdb.commons.sync.pipesink.PipeSink; -import org.apache.iotdb.commons.sync.utils.SyncConstant; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class SyncLogReader { - private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class); - // <pipeSinkName, PipeSink> - private final Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>(); - // <pipeName, Pipe> - private final Map<String, PipeInfo> pipes = new ConcurrentHashMap<>(); - private final File dir; - private final String fileName; - - public SyncLogReader(File dir) { - this.dir = dir; - this.fileName = SyncConstant.SYNC_LOG_NAME; - } - - public SyncLogReader(File dir, String fileName) { - this.dir = dir; - this.fileName = fileName; - } - - public void recover() throws IOException { - logger.info("Start to recover all sync state for sync."); - File serviceLogFile = new File(dir, fileName); - if (!serviceLogFile.exists()) { - logger.warn("Sync service log file not found"); - } else { - try (InputStream inputStream = new FileInputStream(serviceLogFile)) { - recoverPipe(inputStream); - } - } - } - - public Map<String, PipeSink> getAllPipeSinks() { - return pipeSinks; - } - - public Map<String, PipeInfo> getPipes() { - return pipes; - } - - private void recoverPipe(InputStream inputStream) throws IOException { - byte nextByte; - while ((nextByte = ReadWriteIOUtils.readByte(inputStream)) != -1) { - SyncOperation operationType = SyncOperation.values()[nextByte]; - switch (operationType) { - case CREATE_PIPESINK: - PipeSink pipeSink = PipeSink.deserializePipeSink(inputStream); - pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink); - break; - case DROP_PIPESINK: - pipeSinks.remove(ReadWriteIOUtils.readString(inputStream)); - break; - case CREATE_PIPE: - PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(inputStream); - pipes.putIfAbsent(pipeInfo.getPipeName(), pipeInfo); - break; - case STOP_PIPE: - String pipeName = ReadWriteIOUtils.readString(inputStream); - pipes.get(pipeName).setStatus(PipeStatus.STOP); - break; - case START_PIPE: - pipeName = ReadWriteIOUtils.readString(inputStream); - pipes.get(pipeName).setStatus(PipeStatus.RUNNING); - break; - case DROP_PIPE: - pipeName = ReadWriteIOUtils.readString(inputStream); - pipes.remove(pipeName); - break; - default: - throw new UnsupportedOperationException( - String.format("Can not recognize SyncOperation %s.", operationType.name())); - } - } - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java deleted file mode 100644 index dc02f69f294..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.sync.persistence; - -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.SyncOperation; -import org.apache.iotdb.commons.sync.pipesink.PipeSink; -import org.apache.iotdb.commons.sync.utils.SyncConstant; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -/** - * SyncLogger is used to manage the persistent information in the sync module. Persistent - * information can be recovered on reboot via {@linkplain SyncLogReader}. - */ -public class SyncLogWriter implements AutoCloseable { - // record pipe meta info - private OutputStream outputStream; - private final File dir; - private final String fileName; - - public SyncLogWriter(File dir) { - this.dir = dir; - this.fileName = SyncConstant.SYNC_LOG_NAME; - } - - public SyncLogWriter(File dir, String fileName) { - this.dir = dir; - this.fileName = fileName; - } - - public void initOutputStream() throws IOException { - if (outputStream == null) { - File logFile = new File(dir, fileName); - if (!logFile.getParentFile().exists()) { - logFile.getParentFile().mkdirs(); - } - outputStream = new FileOutputStream(logFile, true); - } - } - - public synchronized void addPipeSink(PipeSink pipeSink) throws IOException { - initOutputStream(); - ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPESINK.ordinal(), outputStream); - pipeSink.serialize(outputStream); - } - - public synchronized void dropPipeSink(String pipeSinkName) throws IOException { - initOutputStream(); - ReadWriteIOUtils.write((byte) SyncOperation.DROP_PIPESINK.ordinal(), outputStream); - ReadWriteIOUtils.write(pipeSinkName, outputStream); - } - - public synchronized void addPipe(PipeInfo pipeInfo) throws IOException { - initOutputStream(); - ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPE.ordinal(), outputStream); - pipeInfo.serialize(outputStream); - } - - public synchronized void operatePipe(String pipeName, SyncOperation syncOperation) - throws IOException { - initOutputStream(); - ReadWriteIOUtils.write((byte) syncOperation.ordinal(), outputStream); - ReadWriteIOUtils.write(pipeName, outputStream); - } - - @Override - public void close() throws IOException { - if (outputStream != null) { - outputStream.close(); - outputStream = null; - } - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java deleted file mode 100644 index 5b5fcf5e9a0..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.sync.pipe; - -public enum SyncOperation { - CREATE_PIPESINK, - DROP_PIPESINK, - // PIPE - CREATE_PIPE, - START_PIPE, - STOP_PIPE, - DROP_PIPE -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java deleted file mode 100644 index 1396a88ac88..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.iotdb.commons.sync.pipesink; - -import org.apache.iotdb.commons.exception.sync.PipeSinkException; -import org.apache.iotdb.commons.sync.utils.SyncConstant; -import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.regex.Pattern; - -public class IoTDBPipeSink implements PipeSink { - private static final PipeSinkType pipeSinkType = PipeSinkType.IoTDB; - - private String name; - private String ip; - private int port; - - private static final String ATTRIBUTE_IP_KEY = "ip"; - private static final String ATTRIBUTE_PORT_KEY = "port"; - - public IoTDBPipeSink() {} - - public IoTDBPipeSink(String name) { - this(); - this.ip = SyncConstant.DEFAULT_PIPE_SINK_IP; - this.port = SyncConstant.DEFAULT_PIPE_SINK_PORT; - this.name = name; - } - - @Override - public void setAttribute(Map<String, String> params) throws PipeSinkException { - for (Map.Entry<String, String> entry : params.entrySet()) { - String attr = entry.getKey(); - String value = entry.getValue(); - - attr = attr.toLowerCase(); - if (attr.equals(ATTRIBUTE_IP_KEY)) { - if (!Pattern.matches(SyncConstant.IPV4_PATTERN, value)) { - throw new PipeSinkException( - String.format("%s is nonstandard IP address, only support IPv4 now.", value)); - } - ip = value; - } else if (attr.equals(ATTRIBUTE_PORT_KEY)) { - try { - port = Integer.parseInt(value); - } catch (NumberFormatException e) { - throw new PipeSinkException(attr, value, TSDataType.INT32.name()); - } - } else { - throw new PipeSinkException( - "There is No attribute " + attr + " in " + PipeSinkType.IoTDB + " pipeSink."); - } - } - } - - public String getIp() { - return ip; - } - - public int getPort() { - return port; - } - - @Override - public String getPipeSinkName() { - return name; - } - - @Override - public PipeSinkType getType() { - return pipeSinkType; - } - - @Override - public String showAllAttributes() { - return String.format("%s='%s',%s=%d", ATTRIBUTE_IP_KEY, ip, ATTRIBUTE_PORT_KEY, port); - } - - @Override - public TPipeSinkInfo getTPipeSinkInfo() { - Map<String, String> attributes = new HashMap<>(); - attributes.put(ATTRIBUTE_IP_KEY, ip); - attributes.put(ATTRIBUTE_PORT_KEY, String.valueOf(port)); - return new TPipeSinkInfo(this.name, this.pipeSinkType.name()).setAttributes(attributes); - } - - @Override - public void serialize(OutputStream outputStream) throws IOException { - ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream); - ReadWriteIOUtils.write(name, outputStream); - ReadWriteIOUtils.write(ip, outputStream); - ReadWriteIOUtils.write(port, outputStream); - } - - @Override - public void deserialize(InputStream inputStream) throws IOException { - name = ReadWriteIOUtils.readString(inputStream); - ip = ReadWriteIOUtils.readString(inputStream); - port = ReadWriteIOUtils.readInt(inputStream); - } - - @Override - public void deserialize(ByteBuffer buffer) { - name = ReadWriteIOUtils.readString(buffer); - ip = ReadWriteIOUtils.readString(buffer); - port = ReadWriteIOUtils.readInt(buffer); - } - - @Override - public String toString() { - return "IoTDBPipeSink{" - + "pipeSinkType=" - + pipeSinkType - + ", name='" - + name - + '\'' - + ", ip='" - + ip - + '\'' - + ", port=" - + port - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - IoTDBPipeSink pipeSink = (IoTDBPipeSink) o; - return port == pipeSink.port - && pipeSinkType == pipeSink.pipeSinkType - && Objects.equals(name, pipeSink.name) - && Objects.equals(ip, pipeSink.ip); - } - - @Override - public int hashCode() { - return Objects.hash(pipeSinkType, name, ip, port); - } -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java deleted file mode 100644 index ad54ad7eb23..00000000000 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.iotdb.commons.sync.pipesink; - -import org.apache.iotdb.commons.exception.sync.PipeSinkException; -import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Map; - -public interface PipeSink { - - void setAttribute(Map<String, String> params) throws PipeSinkException; - - String getPipeSinkName(); - - PipeSinkType getType(); - - String showAllAttributes(); - - TPipeSinkInfo getTPipeSinkInfo(); - - void serialize(OutputStream outputStream) throws IOException; - - void deserialize(InputStream inputStream) throws IOException; - - void deserialize(ByteBuffer buffer); - - static PipeSink deserializePipeSink(InputStream inputStream) throws IOException { - PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(inputStream)]; - PipeSink pipeSink; - switch (pipeSinkType) { - case IoTDB: - pipeSink = new IoTDBPipeSink(); - pipeSink.deserialize(inputStream); - break; - case ExternalPipe: - // TODO(ext-pipe): deserialize external pipesink here - default: - throw new UnsupportedOperationException( - String.format("Can not recognize PipeSinkType %s.", pipeSinkType.name())); - } - return pipeSink; - } - - static PipeSink deserializePipeSink(ByteBuffer buffer) { - PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(buffer)]; - PipeSink pipeSink; - switch (pipeSinkType) { - case IoTDB: - pipeSink = new IoTDBPipeSink(); - pipeSink.deserialize(buffer); - break; - case ExternalPipe: - // TODO(ext-pipe): deserialize external pipesink here - default: - throw new UnsupportedOperationException( - String.format("Can not recognize PipeSinkType %s.", pipeSinkType.name())); - } - return pipeSink; - } - - enum PipeSinkType { - IoTDB, - ExternalPipe - } -} diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/sync/metedata/SyncMetadataTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/sync/metedata/SyncMetadataTest.java deleted file mode 100644 index b634534bcfa..00000000000 --- a/node-commons/src/test/java/org/apache/iotdb/commons/sync/metedata/SyncMetadataTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.sync.metedata; - -import org.apache.iotdb.commons.exception.sync.PipeAlreadyExistException; -import org.apache.iotdb.commons.exception.sync.PipeException; -import org.apache.iotdb.commons.exception.sync.PipeNotExistException; -import org.apache.iotdb.commons.exception.sync.PipeSinkAlreadyExistException; -import org.apache.iotdb.commons.exception.sync.PipeSinkBeingUsedException; -import org.apache.iotdb.commons.exception.sync.PipeSinkException; -import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException; -import org.apache.iotdb.commons.sync.metadata.SyncMetadata; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; -import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo; -import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink; -import org.apache.iotdb.commons.sync.pipesink.PipeSink; - -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; - -public class SyncMetadataTest { - private static final File snapshotDir = new File("target", "snapshot"); - - private static final String PIPESINK_NAME_1 = "pipeSink1"; - private static final String PIPESINK_NAME_2 = "pipeSink2"; - private static PipeSink ioTDBPipeSink1; - private static PipeSink ioTDBPipeSink2; - - private static final String PIPE_NAME_1 = "pipe1"; - private static final String PIPE_NAME_2 = "pipe2"; - private static PipeInfo pipeInfo1; - private static PipeInfo pipeInfo2; - - @BeforeClass - public static void setUp() { - ioTDBPipeSink1 = new IoTDBPipeSink(PIPESINK_NAME_1); - ioTDBPipeSink2 = new IoTDBPipeSink(PIPESINK_NAME_2); - pipeInfo1 = - new TsFilePipeInfo(PIPE_NAME_1, PIPESINK_NAME_1, System.currentTimeMillis(), 0, true); - pipeInfo2 = - new TsFilePipeInfo(PIPE_NAME_2, PIPESINK_NAME_2, System.currentTimeMillis(), 99, false); - } - - @Test - public void testPipeSinkOperation() { - SyncMetadata syncMetadata = new SyncMetadata(); - Assert.assertFalse(syncMetadata.isPipeSinkExist(PIPESINK_NAME_1)); - // check and add ioTDBPipeSink1 - try { - syncMetadata.checkPipeSinkNoExist(PIPESINK_NAME_1); - } catch (PipeSinkException e) { - Assert.fail(e.getMessage()); - } - syncMetadata.addPipeSink(ioTDBPipeSink1); - Assert.assertTrue(syncMetadata.isPipeSinkExist(PIPESINK_NAME_1)); - try { - syncMetadata.checkPipeSinkNoExist(PIPESINK_NAME_1); - Assert.fail(); - } catch (PipeSinkException e) { - Assert.assertTrue(e instanceof PipeSinkAlreadyExistException); - } - // add ioTDBPipeSink2 - try { - syncMetadata.checkDropPipeSink(PIPESINK_NAME_2); - Assert.fail(); - } catch (PipeSinkException e) { - Assert.assertTrue(e instanceof PipeSinkNotExistException); - } - syncMetadata.addPipeSink(ioTDBPipeSink2); - // get PipeSink - Assert.assertEquals(ioTDBPipeSink1, syncMetadata.getPipeSink(PIPESINK_NAME_1)); - Assert.assertEquals(ioTDBPipeSink2, syncMetadata.getPipeSink(PIPESINK_NAME_2)); - Assert.assertEquals(2, syncMetadata.getAllPipeSink().size()); - // drop ioTDBPipeSink2 - syncMetadata.addPipe(pipeInfo2); - try { - syncMetadata.checkDropPipeSink(PIPESINK_NAME_2); - Assert.fail(); - } catch (PipeSinkException e) { - Assert.assertTrue(e instanceof PipeSinkBeingUsedException); - } - syncMetadata.dropPipe(PIPE_NAME_2); - try { - syncMetadata.checkDropPipeSink(PIPESINK_NAME_2); - } catch (PipeSinkException e) { - Assert.fail(e.getMessage()); - } - syncMetadata.dropPipeSink(PIPESINK_NAME_2); - Assert.assertNull(syncMetadata.getPipeSink(PIPESINK_NAME_2)); - Assert.assertEquals(1, syncMetadata.getAllPipeSink().size()); - } - - @Test - public void testPipeOperation() { - SyncMetadata syncMetadata = new SyncMetadata(); - // check add pipe - try { - syncMetadata.checkAddPipe(pipeInfo1); - Assert.fail(); - } catch (Exception e) { - Assert.assertTrue(e instanceof PipeSinkNotExistException); - } - try { - syncMetadata.checkIfPipeExist(PIPE_NAME_1); - Assert.fail(); - } catch (PipeException e) { - Assert.assertTrue(e instanceof PipeNotExistException); - } - syncMetadata.addPipeSink(ioTDBPipeSink1); - syncMetadata.addPipeSink(ioTDBPipeSink2); - try { - syncMetadata.checkAddPipe(pipeInfo1); - } catch (Exception e) { - Assert.fail(e.getMessage()); - } - syncMetadata.addPipe(pipeInfo1); - try { - syncMetadata.checkAddPipe( - new TsFilePipeInfo(PIPE_NAME_1, PIPESINK_NAME_2, System.currentTimeMillis(), 99, false)); - Assert.fail(); - } catch (Exception e) { - Assert.assertTrue(e instanceof PipeAlreadyExistException); - Assert.assertTrue(e.getMessage().contains("please retry after drop it")); - } - syncMetadata.addPipe(pipeInfo2); - // check get pipe - Assert.assertEquals(pipeInfo1, syncMetadata.getPipeInfo(PIPE_NAME_1)); - Assert.assertEquals(pipeInfo2, syncMetadata.getPipeInfo(PIPE_NAME_2)); - Assert.assertEquals(2, syncMetadata.getAllPipeInfos().size()); - // check setter - syncMetadata.setPipeStatus(PIPE_NAME_1, PipeStatus.RUNNING); - syncMetadata.changePipeMessage(PIPE_NAME_1, PipeMessage.PipeMessageType.WARN); - Assert.assertEquals(PipeStatus.RUNNING, syncMetadata.getPipeInfo(PIPE_NAME_1).getStatus()); - Assert.assertEquals( - PipeMessage.PipeMessageType.WARN, syncMetadata.getPipeInfo(PIPE_NAME_1).getMessageType()); - // check drop - syncMetadata.dropPipe(PIPE_NAME_1); - Assert.assertNull(syncMetadata.getPipeInfo(PIPE_NAME_1)); - Assert.assertEquals(1, syncMetadata.getAllPipeInfos().size()); - } - - @Test - public void testSnapshot() throws Exception { - try { - if (snapshotDir.exists()) { - FileUtils.deleteDirectory(snapshotDir); - } - snapshotDir.mkdirs(); - SyncMetadata syncMetadata = new SyncMetadata(); - syncMetadata.addPipeSink(ioTDBPipeSink1); - syncMetadata.addPipeSink(ioTDBPipeSink2); - syncMetadata.addPipe(pipeInfo1); - syncMetadata.addPipe(pipeInfo2); - syncMetadata.processTakeSnapshot(snapshotDir); - SyncMetadata syncMetadata1 = new SyncMetadata(); - syncMetadata1.processLoadSnapshot(snapshotDir); - Assert.assertEquals(pipeInfo1, syncMetadata.getPipeInfo(PIPE_NAME_1)); - Assert.assertEquals(pipeInfo2, syncMetadata.getPipeInfo(PIPE_NAME_2)); - Assert.assertEquals(ioTDBPipeSink1, syncMetadata.getPipeSink(PIPESINK_NAME_1)); - Assert.assertEquals(ioTDBPipeSink2, syncMetadata.getPipeSink(PIPESINK_NAME_2)); - } finally { - if (snapshotDir.exists()) { - FileUtils.deleteDirectory(snapshotDir); - } - } - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncReceiverV1_1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncReceiverV1_1.java index d8b6b7fbfb4..97990579caf 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncReceiverV1_1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncReceiverV1_1.java @@ -21,11 +21,7 @@ package org.apache.iotdb.db.pipe.connector.lagacy; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.sync.PipeDataLoadException; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.sync.transport.SyncIdentityInfo; -import org.apache.iotdb.commons.sync.utils.SyncConstant; -import org.apache.iotdb.commons.sync.utils.SyncPathUtil; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.plan.Coordinator; @@ -33,8 +29,12 @@ import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement; +import org.apache.iotdb.db.pipe.connector.lagacy.exception.SyncDataLoadException; import org.apache.iotdb.db.pipe.connector.lagacy.pipedata.PipeData; import org.apache.iotdb.db.pipe.connector.lagacy.pipedata.TsFilePipeData; +import org.apache.iotdb.db.pipe.connector.lagacy.transport.SyncIdentityInfo; +import org.apache.iotdb.db.pipe.connector.lagacy.utils.SyncConstant; +import org.apache.iotdb.db.pipe.connector.lagacy.utils.SyncPathUtil; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -280,7 +280,7 @@ public class IoTDBSyncReceiverV1_1 { pipeData.createLoader().load(); logger.info( "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber()); - } catch (PipeDataLoadException e) { + } catch (SyncDataLoadException e) { logger.error("Fail to load pipeData because {}.", e.getMessage()); return RpcUtils.getStatus( TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + e.getMessage()); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeDataLoadException.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/exception/SyncDataLoadException.java similarity index 78% rename from node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeDataLoadException.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/exception/SyncDataLoadException.java index a873325d047..399c1373d92 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeDataLoadException.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/exception/SyncDataLoadException.java @@ -16,10 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.exception.sync; +package org.apache.iotdb.db.pipe.connector.lagacy.exception; -public class PipeDataLoadException extends PipeServerException { - public PipeDataLoadException(String message) { +import org.apache.iotdb.pipe.api.exception.PipeException; + +public class SyncDataLoadException extends PipeException { + + public SyncDataLoadException(String message) { super(message); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/TsFilePipeData.java index 393e3c75937..979d9fbfd56 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/TsFilePipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/TsFilePipeData.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.pipe.connector.lagacy.pipedata; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.sync.utils.SyncConstant; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.pipe.connector.lagacy.pipedata.load.ILoader; import org.apache.iotdb.db.pipe.connector.lagacy.pipedata.load.TsFileLoader; +import org.apache.iotdb.db.pipe.connector.lagacy.utils.SyncConstant; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/DeletionLoader.java index c3459aec42c..0e0c2f7b7a8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/DeletionLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/DeletionLoader.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.connector.lagacy.pipedata.load; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.exception.sync.PipeDataLoadException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.exception.LoadFileException; @@ -28,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement; +import org.apache.iotdb.db.pipe.connector.lagacy.exception.SyncDataLoadException; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.rpc.TSStatusCode; @@ -47,9 +47,9 @@ public class DeletionLoader implements ILoader { } @Override - public void load() throws PipeDataLoadException { + public void load() throws SyncDataLoadException { if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { - throw new PipeDataLoadException("storage engine readonly"); + throw new SyncDataLoadException("storage engine readonly"); } try { Statement statement = generateStatement(); @@ -71,7 +71,7 @@ public class DeletionLoader implements ILoader { String.format("Can not execute delete statement: %s", statement)); } } catch (Exception e) { - throw new PipeDataLoadException(e.getMessage()); + throw new SyncDataLoadException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/ILoader.java index 3016e0a794c..de07aec08fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/ILoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/ILoader.java @@ -18,11 +18,11 @@ */ package org.apache.iotdb.db.pipe.connector.lagacy.pipedata.load; -import org.apache.iotdb.commons.exception.sync.PipeDataLoadException; import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; +import org.apache.iotdb.db.pipe.connector.lagacy.exception.SyncDataLoadException; /** * This interface is used to load files, including tsFile, syncTask, schema, modsFile and @@ -34,5 +34,5 @@ public interface ILoader { ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance(); - void load() throws PipeDataLoadException; + void load() throws SyncDataLoadException; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/TsFileLoader.java index 48b81a116ed..5cfd59e6661 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/TsFileLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/load/TsFileLoader.java @@ -19,13 +19,13 @@ package org.apache.iotdb.db.pipe.connector.lagacy.pipedata.load; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.sync.PipeDataLoadException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.pipe.connector.lagacy.exception.SyncDataLoadException; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.rpc.TSStatusCode; @@ -47,7 +47,7 @@ public class TsFileLoader implements ILoader { } @Override - public void load() throws PipeDataLoadException { + public void load() throws SyncDataLoadException { try { LoadTsFileStatement statement = new LoadTsFileStatement(tsFile.getAbsolutePath()); @@ -74,7 +74,7 @@ public class TsFileLoader implements ILoader { String.format("Can not execute load TsFile statement: %s", statement)); } } catch (Exception e) { - throw new PipeDataLoadException(e.getMessage()); + throw new SyncDataLoadException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/queue/BufferedPipeDataQueue.java index 2ffe440602e..d6c28f4c4da 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/queue/BufferedPipeDataQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/pipedata/queue/BufferedPipeDataQueue.java @@ -19,11 +19,11 @@ package org.apache.iotdb.db.pipe.connector.lagacy.pipedata.queue; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.sync.utils.SyncConstant; -import org.apache.iotdb.commons.sync.utils.SyncPathUtil; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.pipe.connector.lagacy.pipedata.PipeData; import org.apache.iotdb.db.pipe.connector.lagacy.pipedata.TsFilePipeData; +import org.apache.iotdb.db.pipe.connector.lagacy.utils.SyncConstant; +import org.apache.iotdb.db.pipe.connector.lagacy.utils.SyncPathUtil; import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.slf4j.Logger; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/transport/SyncIdentityInfo.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/transport/SyncIdentityInfo.java similarity index 96% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/transport/SyncIdentityInfo.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/transport/SyncIdentityInfo.java index 139a715fa9b..7691cfaa6ca 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/transport/SyncIdentityInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/transport/SyncIdentityInfo.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.sync.transport; +package org.apache.iotdb.db.pipe.connector.lagacy.transport; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/utils/SyncConstant.java similarity index 65% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncConstant.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/utils/SyncConstant.java index 427a2113972..10489a329b1 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/utils/SyncConstant.java @@ -16,18 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.sync.utils; - -import org.apache.iotdb.rpc.RpcUtils; +package org.apache.iotdb.db.pipe.connector.lagacy.utils; public class SyncConstant { /** common */ - public static final String UNKNOWN_IP = "UNKNOWN IP"; - public static final String SYNC_SYS_DIR = "sys"; + public static final String FILE_DATA_DIR_NAME = "file-data"; - public static final String ROLE_SENDER = "sender"; - public static final String ROLE_RECEIVER = "receiver"; // pipe log: serialNumber + SEPARATOR + SUFFIX public static final String PIPE_LOG_DIR_NAME = "pipe-log"; @@ -46,42 +41,22 @@ public class SyncConstant { public static final String SENDER_DIR_NAME = "sender"; public static final String HISTORY_PIPE_LOG_DIR_NAME = "history-" + PIPE_LOG_DIR_NAME; - public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock"; - public static final String MODS_OFFSET_FILE_SUFFIX = ".offset"; - - // recover - public static final String PLAN_SERIALIZE_SPLIT_CHARACTER = ","; - public static final String SENDER_LOG_SPLIT_CHARACTER = "#"; // data config public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1"; public static final int DEFAULT_PIPE_SINK_PORT = 6667; - public static final int CONNECT_TIMEOUT_MILLISECONDS = 1_000; - public static final int SOCKET_TIMEOUT_MILLISECONDS = 100_000; - public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L; public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L; - public static final Long DEFAULT_WAITING_FOR_STOP_MILLISECONDS = 1000L; /** transport */ - - // Split data file, block size at each transmission */ - public static final int DATA_CHUNK_SIZE = - Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE); - public static final String PATCH_SUFFIX = ".patch"; + public static final String IPV4_PATTERN = "^([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}$"; - public static final int HEARTBEAT_INTERVAL_MILLISECONDS = 5_000; - public static final int LOST_CONNECT_REPORT_MILLISECONDS = 30_000; /** receiver */ public static final String RECEIVER_DIR_NAME = "receiver"; public static final String IP_SEPARATOR = "\\."; - - // TODO: serialize AbstractReceiverInfo - public static final byte PIPE_NAME_MAP_TYPE = 0; - public static final byte PIPE_MESSAGE_TYPE = 2; } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/utils/SyncPathUtil.java similarity index 56% rename from node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/utils/SyncPathUtil.java index 157110d498c..ae82ddf1d22 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/utils/SyncPathUtil.java @@ -17,10 +17,10 @@ * under the License. */ -package org.apache.iotdb.commons.sync.utils; +package org.apache.iotdb.db.pipe.connector.lagacy.utils; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.sync.transport.SyncIdentityInfo; +import org.apache.iotdb.db.pipe.connector.lagacy.transport.SyncIdentityInfo; import java.io.File; import java.io.IOException; @@ -45,55 +45,6 @@ public class SyncPathUtil { // | |----file data dir // |----sys dir - /** sender */ - public static String getSenderDir() { - return CommonDescriptor.getInstance().getConfig().getSyncDir() - + File.separator - + SyncConstant.SENDER_DIR_NAME; - } - - public static String getSenderPipeDir(String pipeName, long createTime) { - return getSenderDir() + File.separator + getSenderPipeDirName(pipeName, createTime); - } - - public static String getSenderPipeDirName(String pipeName, long createTime) { - return String.format("%s-%d", pipeName, createTime); - } - - public static String getSenderDataRegionHistoryPipeLogDir( - String pipeName, long createTime, String dataRegionId) { - return getSenderPipeDir(pipeName, createTime) - + File.separator - + SyncConstant.HISTORY_PIPE_LOG_DIR_NAME - + File.separator - + dataRegionId; - } - - public static String getSenderDataRegionRealTimePipeLogDir( - String pipeName, long createTime, String dataRegionId) { - return getSenderPipeDir(pipeName, createTime) - + File.separator - + SyncConstant.PIPE_LOG_DIR_NAME - + File.separator - + dataRegionId; - } - - public static String getSenderHistoryPipeLogDir(String pipeName, long createTime) { - return getSenderPipeDir(pipeName, createTime) - + File.separator - + SyncConstant.HISTORY_PIPE_LOG_DIR_NAME; - } - - public static String getSenderRealTimePipeLogDir(String pipeName, long createTime) { - return getSenderPipeDir(pipeName, createTime) + File.separator + SyncConstant.PIPE_LOG_DIR_NAME; - } - - public static String getSenderFileDataDir(String pipeName, long createTime) { - return getSenderPipeDir(pipeName, createTime) - + File.separator - + SyncConstant.FILE_DATA_DIR_NAME; - } - /** receiver */ public static String getReceiverDir() { return CommonDescriptor.getInstance().getConfig().getSyncDir() @@ -111,12 +62,6 @@ public class SyncPathUtil { return String.format("%s-%d-%s", pipeName, createTime, remoteIp); } - public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) { - return getReceiverPipeDir(pipeName, remoteIp, createTime) - + File.separator - + SyncConstant.PIPE_LOG_DIR_NAME; - } - public static String getReceiverFileDataDir(String pipeName, String remoteIp, long createTime) { return getReceiverPipeDir(pipeName, remoteIp, createTime) + File.separator @@ -129,12 +74,6 @@ public class SyncPathUtil { } /** common */ - public static String getSysDir() { - return CommonDescriptor.getInstance().getConfig().getSyncDir() - + File.separator - + SyncConstant.SYNC_SYS_DIR; - } - public static String getPipeLogName(long serialNumber) { return serialNumber + SyncConstant.PIPE_LOG_NAME_SUFFIX; } @@ -149,8 +88,4 @@ public class SyncPathUtil { } return file.createNewFile(); } - - public static String createMsg(String timeStr, String msg) { - return String.format("[%s] %s", timeStr, msg); - } }
