This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new b2baf89 [To new_sync][IOTDB-2272] remove duplicate PipeStatus and add
PipeDataLoadException for ILoader (#5379)
b2baf89 is described below
commit b2baf8998bc7a8c58f80b205f47d506e484ddf19
Author: Chen YZ <[email protected]>
AuthorDate: Thu Mar 31 21:36:39 2022 +0800
[To new_sync][IOTDB-2272] remove duplicate PipeStatus and add
PipeDataLoadException for ILoader (#5379)
---
.../db/integration/sync/IoTDBSyncReceiverIT.java | 5 +--
.../sync/PipeDataLoadBearableException.java} | 10 ++---
.../sync/PipeDataLoadException.java} | 10 ++---
.../sync/PipeDataLoadUnbearableException.java} | 10 ++---
.../iotdb/db/newsync/receiver/ReceiverService.java | 2 +-
.../db/newsync/receiver/collector/Collector.java | 18 ++++-----
.../db/newsync/receiver/load/DeletionLoader.java | 16 +++++---
.../iotdb/db/newsync/receiver/load/ILoader.java | 11 +-----
.../db/newsync/receiver/load/SchemaLoader.java | 17 +++++++-
.../db/newsync/receiver/load/TsFileLoader.java | 45 +++++++++++-----------
.../db/newsync/receiver/manager/PipeInfo.java | 2 +
.../newsync/receiver/manager/ReceiverManager.java | 1 +
.../db/newsync/receiver/recovery/ReceiverLog.java | 2 +-
.../receiver/recovery/ReceiverLogAnalyzer.java | 2 +-
.../receiver/manager/ReceiverManagerTest.java | 1 +
.../receiver/recovery/ReceiverLogAnalyzerTest.java | 2 +-
16 files changed, 83 insertions(+), 71 deletions(-)
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 638b3f3..18f1935 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.newsync.receiver.ReceiverService;
-import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.newsync.transport.client.TransportClient;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -121,8 +121,7 @@ public class IoTDBSyncReceiverIT {
.getConfig()
.setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
FileUtils.deleteDirectory(tmpDir);
- // client.close();
- // ReceiverService.getInstance().stopPipeServer();
+ client.close();
EnvironmentUtils.cleanEnv();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
b/server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadBearableException.java
similarity index 80%
copy from
server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
copy to
server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadBearableException.java
index 38ea424..4638e35 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadBearableException.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.newsync.receiver.manager;
+package org.apache.iotdb.db.exception.sync;
-public enum PipeStatus {
- RUNNING,
- STOP,
- DROP
+public class PipeDataLoadBearableException extends PipeDataLoadException {
+ public PipeDataLoadBearableException(String message) {
+ super(message);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
b/server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadException.java
similarity index 81%
copy from
server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
copy to
server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadException.java
index 38ea424..c10d991 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadException.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.newsync.receiver.manager;
+package org.apache.iotdb.db.exception.sync;
-public enum PipeStatus {
- RUNNING,
- STOP,
- DROP
+public class PipeDataLoadException extends PipeServerException {
+ public PipeDataLoadException(String message) {
+ super(message);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
b/server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadUnbearableException.java
similarity index 79%
rename from
server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
rename to
server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadUnbearableException.java
index 38ea424..8ef4225 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/sync/PipeDataLoadUnbearableException.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.newsync.receiver.manager;
+package org.apache.iotdb.db.exception.sync;
-public enum PipeStatus {
- RUNNING,
- STOP,
- DROP
+public class PipeDataLoadUnbearableException extends PipeDataLoadException {
+ public PipeDataLoadUnbearableException(String message) {
+ super(message);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index 1276b78..5773271 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -26,8 +26,8 @@ import
org.apache.iotdb.db.newsync.pipedata.queue.PipeDataQueueFactory;
import org.apache.iotdb.db.newsync.receiver.collector.Collector;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.newsync.receiver.manager.PipeMessage;
-import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.newsync.transport.server.TransportServerManager;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
index 20fe254..befaf04 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.db.newsync.receiver.collector;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadBearableException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.pipedata.queue.PipeDataQueue;
@@ -135,18 +136,17 @@ public class Collector {
logger.warn("Be interrupted when waiting for pipe data, because {}",
e.getMessage());
Thread.currentThread().interrupt();
break;
- } catch (StorageGroupAlreadySetException e) {
+ } catch (PipeDataLoadBearableException e) {
// bearable exception
- String msg =
- String.format(
- "Sync receiver try to set storage group %s that has already
been set",
- e.getStorageGroupPath());
- logger.warn(msg);
+ logger.warn(e.getMessage());
ReceiverManager.getInstance()
.writePipeMessage(
- pipeName, remoteIp, createTime, new
PipeMessage(PipeMessage.MsgType.WARN, msg));
+ pipeName,
+ remoteIp,
+ createTime,
+ new PipeMessage(PipeMessage.MsgType.WARN, e.getMessage()));
pipeDataQueue.commit();
- } catch (Exception e) {
+ } catch (PipeDataLoadException e) {
// unbearable exception
// TODO: should drop this pipe?
String msg;
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
index b12c0cb..7d8564b 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.newsync.receiver.load;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.StorageEngineReadonlyException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadUnbearableException;
/** This loader is used to load deletion plan. */
public class DeletionLoader implements ILoader {
@@ -34,11 +34,15 @@ public class DeletionLoader implements ILoader {
}
@Override
- public void load() throws StorageEngineException {
+ public void load() throws PipeDataLoadException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
- throw new StorageEngineReadonlyException();
+ throw new PipeDataLoadUnbearableException("storage engine readonly");
+ }
+ try {
+ StorageEngine.getInstance()
+ .delete(deletion.getPath(), deletion.getStartTime(),
deletion.getEndTime(), 0, null);
+ } catch (Exception e) {
+ throw new PipeDataLoadUnbearableException(e.getMessage());
}
- StorageEngine.getInstance()
- .delete(deletion.getPath(), deletion.getStartTime(),
deletion.getEndTime(), 0, null);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
index 05be693..c1e1b46 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
@@ -18,19 +18,12 @@
*/
package org.apache.iotdb.db.newsync.receiver.load;
-import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-
-import java.io.IOException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
/**
* This interface is used to load files, including tsFile, syncTask, schema,
modsFile and
* deletePlan.
*/
public interface ILoader {
- void load()
- throws StorageEngineException, IOException, MetadataException,
WriteProcessException,
- LoadFileException;
+ void load() throws PipeDataLoadException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
index f708f4e..c67a005 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.newsync.receiver.load;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadBearableException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadUnbearableException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -41,7 +45,16 @@ public class SchemaLoader implements ILoader {
}
@Override
- public void load() throws IOException, MetadataException {
- MManager.getInstance().operation(plan);
+ public void load() throws PipeDataLoadException {
+ try {
+ MManager.getInstance().operation(plan);
+ } catch (StorageGroupAlreadySetException e) {
+ throw new PipeDataLoadBearableException(
+ "Sync receiver try to set storage group "
+ + e.getStorageGroupPath()
+ + " that has already been set");
+ } catch (IOException | MetadataException e) {
+ throw new PipeDataLoadUnbearableException(e.getMessage());
+ }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
index 1ee57d2..d5acca7 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
@@ -20,15 +20,12 @@ package org.apache.iotdb.db.newsync.receiver.load;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.db.exception.sync.PipeDataLoadUnbearableException;
import org.apache.iotdb.db.tools.TsFileRewriteTool;
import org.apache.iotdb.db.utils.FileLoaderUtils;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -42,26 +39,28 @@ public class TsFileLoader implements ILoader {
}
@Override
- public void load()
- throws IOException, MetadataException, WriteProcessException,
StorageEngineException,
- LoadFileException {
- TsFileResource tsFileResource = new TsFileResource(tsFile);
- tsFileResource.setClosed(true);
- FileLoaderUtils.checkTsFileResource(tsFileResource);
- List<TsFileResource> splitResources = new ArrayList();
- if (tsFileResource.isSpanMultiTimePartitions()) {
- TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
- tsFileResource.writeLock();
- tsFileResource.removeModFile();
- tsFileResource.writeUnlock();
- }
+ public void load() throws PipeDataLoadException {
+ try {
+ TsFileResource tsFileResource = new TsFileResource(tsFile);
+ tsFileResource.setClosed(true);
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ List<TsFileResource> splitResources = new ArrayList();
+ if (tsFileResource.isSpanMultiTimePartitions()) {
+ TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+ tsFileResource.writeLock();
+ tsFileResource.removeModFile();
+ tsFileResource.writeUnlock();
+ }
- if (splitResources.isEmpty()) {
- splitResources.add(tsFileResource);
- }
+ if (splitResources.isEmpty()) {
+ splitResources.add(tsFileResource);
+ }
- for (TsFileResource resource : splitResources) {
- StorageEngine.getInstance().loadNewTsFile(resource);
+ for (TsFileResource resource : splitResources) {
+ StorageEngine.getInstance().loadNewTsFile(resource);
+ }
+ } catch (Exception e) {
+ throw new PipeDataLoadUnbearableException(e.getMessage());
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
index 535d662..b3bf59a 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.newsync.receiver.manager;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
+
import java.util.Objects;
public class PipeInfo {
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
index 6c466bf..af0370b 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLog;
import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLogAnalyzer;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
index 926b920..f644c14 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.newsync.receiver.recovery;
import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.manager.PipeMessage;
-import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import java.io.BufferedWriter;
import java.io.File;
diff --git
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
index faf8351..a86f13e 100644
---
a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
+++
b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.newsync.receiver.manager.PipeMessage;
-import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
diff --git
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
index 5be7348..4e28755 100644
---
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.newsync.receiver.manager;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
diff --git
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
index 717af73..968fc4f 100644
---
a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.newsync.receiver.manager.PipeMessage;
-import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;