This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aaf7fa2804 [IOTDB-5811] Pipe: PipeHistoricalCollector (#9700)
aaf7fa2804 is described below
commit aaf7fa2804f8e7daa2bf41b20e3b9118d2bb33f3
Author: yschengzi <[email protected]>
AuthorDate: Mon May 8 11:01:28 2023 +0800
[IOTDB-5811] Pipe: PipeHistoricalCollector (#9700)
Historical tsfile collect process:
get DataRegion -> flush all TsFileProcessor -> get All TsFile
---
.../iotdb/commons/consensus/DataRegionId.java | 4 +
.../historical/PipeHistoricalCollector.java | 22 -----
.../historical/PipeHistoricalTsFileCollector.java | 110 +++++++++++++++++++++
.../core/event/impl/PipeTsFileInsertionEvent.java | 1 -
4 files changed, 114 insertions(+), 23 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 03ec5e4d7d..655de94da0 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -27,6 +27,10 @@ public class DataRegionId extends ConsensusGroupId {
this.id = id;
}
+ public DataRegionId(String id) {
+ this.id = Integer.parseInt(id);
+ }
+
@Override
public TConsensusGroupType getType() {
return TConsensusGroupType.DataRegion;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
deleted file mode 100644
index 6e680de847..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.historical;
-
-public class PipeHistoricalCollector {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
new file mode 100644
index 0000000000..d35245fe89
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.historical;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public class PipeHistoricalTsFileCollector implements PipeCollector {
+ private final AtomicBoolean hasBeenStarted;
+ private final String dataRegionId;
+ private Queue<PipeTsFileInsertionEvent> pendingQueue;
+
+ public PipeHistoricalTsFileCollector(String dataRegionId) {
+ this.hasBeenStarted = new AtomicBoolean(false);
+ this.dataRegionId = dataRegionId;
+ }
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ throw new NotImplementedException("Not implement for validate.");
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeCollectorRuntimeConfiguration configuration)
+ throws Exception {
+ throw new NotImplementedException("Not implement for customize.");
+ }
+
+ @Override
+ public void start() {
+ if (hasBeenStarted.get()) {
+ return;
+ }
+ hasBeenStarted.set(true);
+
+ DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion(new
DataRegionId(dataRegionId));
+ dataRegion.writeLock("Pipe: collect historical TsFile");
+ try {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ TsFileManager tsFileManager = dataRegion.getTsFileManager();
+
+ tsFileManager.readLock();
+ try {
+ pendingQueue = new ArrayDeque<>(tsFileManager.size(true) +
tsFileManager.size(false));
+ pendingQueue.addAll(
+ tsFileManager.getTsFileList(true).stream()
+ .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+ .collect(Collectors.toList()));
+ pendingQueue.addAll(
+ tsFileManager.getTsFileList(false).stream()
+ .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+ .collect(Collectors.toList()));
+ } finally {
+ tsFileManager.readUnlock();
+ }
+ } finally {
+ dataRegion.writeUnlock();
+ }
+ }
+
+ @Override
+ public Event supply() {
+ if (pendingQueue == null) {
+ return null;
+ }
+
+ return pendingQueue.poll();
+ }
+
+ @Override
+ public void close() {
+ if (pendingQueue != null) {
+ pendingQueue.clear();
+ pendingQueue = null;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 7282fb77bc..69d527e34b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.File;
public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
-
private final File tsFile;
public PipeTsFileInsertionEvent(File tsFile) {