This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf7fa2804 [IOTDB-5811] Pipe: PipeHistoricalCollector (#9700)
aaf7fa2804 is described below

commit aaf7fa2804f8e7daa2bf41b20e3b9118d2bb33f3
Author: yschengzi <[email protected]>
AuthorDate: Mon May 8 11:01:28 2023 +0800

    [IOTDB-5811] Pipe: PipeHistoricalCollector (#9700)
    
    Historical tsfile collect process:
    get DataRegion -> flush all TsFileProcessor -> get All TsFile
---
 .../iotdb/commons/consensus/DataRegionId.java      |   4 +
 .../historical/PipeHistoricalCollector.java        |  22 -----
 .../historical/PipeHistoricalTsFileCollector.java  | 110 +++++++++++++++++++++
 .../core/event/impl/PipeTsFileInsertionEvent.java  |   1 -
 4 files changed, 114 insertions(+), 23 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 03ec5e4d7d..655de94da0 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -27,6 +27,10 @@ public class DataRegionId extends ConsensusGroupId {
     this.id = id;
   }
 
+  public DataRegionId(String id) {
+    this.id = Integer.parseInt(id);
+  }
+
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.DataRegion;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
deleted file mode 100644
index 6e680de847..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.historical;
-
-public class PipeHistoricalCollector {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
new file mode 100644
index 0000000000..d35245fe89
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.historical;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public class PipeHistoricalTsFileCollector implements PipeCollector {
+  private final AtomicBoolean hasBeenStarted;
+  private final String dataRegionId;
+  private Queue<PipeTsFileInsertionEvent> pendingQueue;
+
+  public PipeHistoricalTsFileCollector(String dataRegionId) {
+    this.hasBeenStarted = new AtomicBoolean(false);
+    this.dataRegionId = dataRegionId;
+  }
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    throw new NotImplementedException("Not implement for validate.");
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeCollectorRuntimeConfiguration configuration)
+      throws Exception {
+    throw new NotImplementedException("Not implement for customize.");
+  }
+
+  @Override
+  public void start() {
+    if (hasBeenStarted.get()) {
+      return;
+    }
+    hasBeenStarted.set(true);
+
+    DataRegion dataRegion =
+        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
+    dataRegion.writeLock("Pipe: collect historical TsFile");
+    try {
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+      TsFileManager tsFileManager = dataRegion.getTsFileManager();
+
+      tsFileManager.readLock();
+      try {
+        pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + 
tsFileManager.size(false));
+        pendingQueue.addAll(
+            tsFileManager.getTsFileList(true).stream()
+                .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+                .collect(Collectors.toList()));
+        pendingQueue.addAll(
+            tsFileManager.getTsFileList(false).stream()
+                .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+                .collect(Collectors.toList()));
+      } finally {
+        tsFileManager.readUnlock();
+      }
+    } finally {
+      dataRegion.writeUnlock();
+    }
+  }
+
+  @Override
+  public Event supply() {
+    if (pendingQueue == null) {
+      return null;
+    }
+
+    return pendingQueue.poll();
+  }
+
+  @Override
+  public void close() {
+    if (pendingQueue != null) {
+      pendingQueue.clear();
+      pendingQueue = null;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 7282fb77bc..69d527e34b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import java.io.File;
 
 public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
-
   private final File tsFile;
 
   public PipeTsFileInsertionEvent(File tsFile) {

Reply via email to