This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch sink
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git

commit 22df722e36d5f31b84a219f77b68a729f60608b7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Mar 12 11:58:51 2025 +0800

    fix
---
 .../iotdb/collector/config/IoTDBCommonOptions.java |  4 +--
 .../plugin/builtin/annotation/TableModel.java      | 39 ----------------------
 .../plugin/builtin/annotation/TreeModel.java       | 39 ----------------------
 .../sink/compressor/PipeCompressorFactory.java     |  4 +--
 .../sink/compressor/PipeGZIPCompressor.java        |  3 +-
 .../builtin/sink/compressor/PipeLZ4Compressor.java |  3 +-
 .../sink/compressor/PipeLZMA2Compressor.java       |  3 +-
 .../sink/compressor/PipeSnappyCompressor.java      |  3 +-
 .../sink/compressor/PipeZSTDCompressor.java        |  1 +
 .../sink/constant/ColumnHeaderConstant.java        |  3 +-
 .../sink/constant/PipeConnectorConstant.java       |  3 +-
 .../sink/event/row/PipeDataTypeTransformer.java    |  1 +
 .../plugin/builtin/sink/event/row/PipeRow.java     |  1 +
 .../builtin/sink/event/row/PipeRowCollector.java   |  5 ++-
 .../event/tablet/PipeRawTabletInsertionEvent.java  |  1 +
 .../event/tsfile/PipeTsFileInsertionEvent.java     | 33 +++++++++---------
 ...meConnectorRetryTimesConfigurableException.java |  1 -
 .../pipe/PipeRuntimeCriticalException.java         |  1 -
 .../exception/pipe/PipeRuntimeExceptionType.java   |  1 -
 .../pipe/PipeRuntimeNonCriticalException.java      |  1 -
 .../PipeRuntimeOutOfMemoryCriticalException.java   |  1 -
 .../builtin/sink/limiter/GlobalRateLimiter.java    |  5 +--
 .../sink/limiter/PipeEndPointRateLimiter.java      | 10 +++---
 .../evolvable/batch/PipeTabletEventBatch.java      | 21 ++++++------
 .../evolvable/batch/PipeTabletEventPlainBatch.java | 29 ++++++++--------
 .../batch/PipeTabletEventTsFileBatch.java          | 23 +++++++------
 26 files changed, 83 insertions(+), 156 deletions(-)

diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java
index 4b25278..ac2304f 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java
@@ -19,6 +19,4 @@
 
 package org.apache.iotdb.collector.config;
 
-public class IoTDBCommonOptions extends Options{
-
-}
+public class IoTDBCommonOptions extends Options {}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TableModel.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TableModel.java
deleted file mode 100644
index f73a1af..0000000
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TableModel.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.plugin.builtin.annotation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Indicates that a plugin can be used in table model environments.
- *
- * <p>When implementing a custom {@link org.apache.iotdb.pipe.api.PipePlugin} 
that needs to operate
- * under table model settings, declare this annotation on the plugin class. 
Through the {@code
- * CREATE PIPEPLUGIN} statement, a plugin annotated with {@link TableModel} is 
valid for both tree
- * model connections and table model connections.
- *
- * @since 2.0.0
- */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface TableModel {}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TreeModel.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TreeModel.java
deleted file mode 100644
index 7e163fe..0000000
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TreeModel.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.plugin.builtin.annotation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Indicates that a plugin can be used in tree model environments.
- *
- * <p>When implementing a custom {@link org.apache.iotdb.pipe.api.PipePlugin} 
that needs to operate
- * under tree model settings, declare this annotation on the plugin class. 
Through the {@code CREATE
- * PIPEPLUGIN} statement, a plugin annotated with {@link TreeModel} is valid 
for both tree model
- * connections and tree model connections.
- *
- * @since 2.0.0
- */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface TreeModel {}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java
index df941f4..e2da42f 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.compressor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static 
org.apache.iotdb.collector.plugin.builtin.sink.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP;
 import static 
org.apache.iotdb.collector.plugin.builtin.sink.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java
index 155af20..38c844e 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.compressor;
 
-import java.io.IOException;
 import org.apache.tsfile.compress.ICompressor;
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 
+import java.io.IOException;
+
 public class PipeGZIPCompressor extends PipeCompressor {
 
   private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.GZIP);
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java
index 2c0a529..9f441d8 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.compressor;
 
-import java.io.IOException;
 import org.apache.tsfile.compress.ICompressor;
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 
+import java.io.IOException;
+
 public class PipeLZ4Compressor extends PipeCompressor {
 
   private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.LZ4);
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java
index e964c95..6910579 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.compressor;
 
-import java.io.IOException;
 import org.apache.tsfile.compress.ICompressor;
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 
+import java.io.IOException;
+
 public class PipeLZMA2Compressor extends PipeCompressor {
 
   private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.LZMA2);
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java
index 7e84db5..996f95d 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.compressor;
 
-import java.io.IOException;
 import org.apache.tsfile.compress.ICompressor;
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 
+import java.io.IOException;
+
 public class PipeSnappyCompressor extends PipeCompressor {
 
   private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.SNAPPY);
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java
index 2568770..67e40ac 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.collector.plugin.builtin.sink.compressor;
 
 import com.github.luben.zstd.Zstd;
+
 import java.io.IOException;
 
 public class PipeZSTDCompressor extends PipeCompressor {
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java
index 23a4b42..4baa3ed 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.constant;
 
-
 public class ColumnHeaderConstant {
 
   private ColumnHeaderConstant() {
@@ -272,7 +271,7 @@ public class ColumnHeaderConstant {
   public static final String CURRENT_DATABASE = "CurrentDatabase";
 
   public static final String CURRENT_SQL_DIALECT = "CurrentSqlDialect";
-/*
+  /*
   public static final List<ColumnHeader> lastQueryColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(TIMESERIES, TSDataType.TEXT),
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java
index 706a2ae..08182eb 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java
@@ -19,9 +19,10 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.constant;
 
-import com.github.luben.zstd.Zstd;
 import org.apache.iotdb.collector.config.PipeOptions;
 
+import com.github.luben.zstd.Zstd;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java
index a4ee481..85883ff 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.collector.plugin.builtin.sink.event.row;
 
 import org.apache.iotdb.pipe.api.type.Type;
+
 import org.apache.tsfile.enums.TSDataType;
 
 import java.util.List;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java
index 84d074e..ad123a9 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 import org.apache.iotdb.pipe.api.type.Binary;
 import org.apache.iotdb.pipe.api.type.Type;
+
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.Path;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java
index 38d8258..73e86ef 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.pipe.api.type.Binary;
+
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -56,9 +57,7 @@ public class PipeRowCollector implements RowCollector {
   }
 
   public PipeRowCollector(
-      PipeRawTabletInsertionEvent sourceEvent,
-      String sourceEventDataBase,
-      Boolean isTableModel) {
+      PipeRawTabletInsertionEvent sourceEvent, String sourceEventDataBase, 
Boolean isTableModel) {
     this.sourceEvent = sourceEvent;
     this.sourceEventDataBaseName = sourceEventDataBase;
     this.isTableModel = isTableModel;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java
index f249143..b7de5b3 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.collector.plugin.builtin.sink.resource.ref.PipePhantomRe
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.record.Tablet;
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java
index e383284..fcc64c2 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java
@@ -19,19 +19,6 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.event.tsfile;
 
-import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
-import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import 
org.apache.iotdb.collector.plugin.builtin.sink.datastructure.pattern.TablePattern;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.datastructure.pattern.TreePattern;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent;
@@ -42,11 +29,25 @@ import 
org.apache.iotdb.collector.plugin.builtin.sink.resource.ref.PipePhantomRe
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
+import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
 public class PipeTsFileInsertionEvent extends PipeInsertionEvent
     implements TsFileInsertionEvent, ReferenceTrackableEvent {
 
@@ -260,9 +261,11 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     try {
-      // tsFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, 
resource);
+      // tsFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true,
+      // resource);
       // if (isWithMod) {
-      //   modFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, 
null);
+      //   modFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false,
+      // null);
       // }
       return true;
     } catch (final Exception e) {
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
index 8d215d8..786be00 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe;
 
-
 public class PipeRuntimeConnectorRetryTimesConfigurableException
     extends PipeRuntimeConnectorCriticalException {
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java
index fda9c61..4fe301a 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java
index 9f39441..fdc38c8 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java
index a539d2e..92ebff1 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java
index 82cc89b..bca4676 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java
index fbd7062..8fc6b39 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.limiter;
 
+import org.apache.iotdb.collector.config.PipeOptions;
+
 import com.google.common.util.concurrent.AtomicDouble;
 import com.google.common.util.concurrent.RateLimiter;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.iotdb.collector.config.PipeOptions;
+import java.util.concurrent.TimeUnit;
 
 /** This is a global rate limiter for all connectors. */
 public class GlobalRateLimiter {
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java
index 39ac4ba..d1d526a 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java
@@ -19,14 +19,15 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.limiter;
 
+import org.apache.iotdb.collector.config.PipeOptions;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
 import com.google.common.util.concurrent.RateLimiter;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.iotdb.collector.config.PipeOptions;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-
 public class PipeEndPointRateLimiter {
 
   // The task agent is used to check if the pipe is still alive
@@ -72,8 +73,7 @@ public class PipeEndPointRateLimiter {
     while (!rateLimiter.tryAcquire(
         bytes,
         PipeOptions.RATE_LIMITER_HOT_RELOAD_CHECK_INTERVAL_MS.value(),
-        TimeUnit.MILLISECONDS)) {
-    }
+        TimeUnit.MILLISECONDS)) {}
     return true;
   }
 }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 716d119..3d8d477 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -19,18 +19,18 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
 import 
org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent;
-import 
org.apache.iotdb.collector.plugin.builtin.sink.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
@@ -62,7 +62,8 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     //         .setExpandCallback(
     //             (oldMemory, newMemory) ->
     //                 LOGGER.info(
-    //                     "The batch size limit has expanded from {} to {}.", 
oldMemory, newMemory));
+    //                     "The batch size limit has expanded from {} to {}.", 
oldMemory,
+    // newMemory));
 
     if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
       LOGGER.info(
@@ -79,8 +80,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    * @param event the given {@link Event}
    * @return {@code true} if the batch can be transferred
    */
-  public synchronized boolean onEvent(final TabletInsertionEvent event)
-      throws IOException {
+  public synchronized boolean onEvent(final TabletInsertionEvent event) throws 
IOException {
     if (isClosed || !(event instanceof PipeRawTabletInsertionEvent)) {
       return false;
     }
@@ -108,8 +108,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    *     cached and not emitted in this batch. If there are failure 
encountered, just throw
    *     exceptions and do not return {@code false} here.
    */
-  protected abstract boolean constructBatch(final TabletInsertionEvent event)
-      throws IOException;
+  protected abstract boolean constructBatch(final TabletInsertionEvent event) 
throws IOException;
 
   public boolean shouldEmit() {
     return totalBufferSize >= getMaxBatchSizeInBytes()
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 03e536b..ad9a1ad 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -19,23 +19,24 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import 
org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
@@ -57,13 +58,13 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   }
 
   @Override
-  protected boolean constructBatch(final TabletInsertionEvent event)
-      throws IOException {
+  protected boolean constructBatch(final TabletInsertionEvent event) throws 
IOException {
     final int bufferSize = buildTabletInsertionBuffer(event);
     totalBufferSize += bufferSize;
     pipe2BytesAccumulated.compute(
         new Pair<>(
-            ((PipeRawTabletInsertionEvent) event).getPipeName(), 
((PipeRawTabletInsertionEvent) event).getCreationTime()),
+            ((PipeRawTabletInsertionEvent) event).getPipeName(),
+            ((PipeRawTabletInsertionEvent) event).getCreationTime()),
         (pipeName, bytesAccumulated) ->
             bytesAccumulated == null ? bufferSize : bytesAccumulated + 
bufferSize);
     return true;
@@ -102,8 +103,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     return pipe2BytesAccumulated;
   }
 
-  private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
-      throws IOException {
+  private int buildTabletInsertionBuffer(final TabletInsertionEvent event) 
throws IOException {
     int databaseEstimateSize = 0;
     final ByteBuffer buffer;
     // if (event instanceof PipeInsertNodeTabletInsertionEvent) {
@@ -130,7 +130,8 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     //     if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
     //       databaseEstimateSize =
     //           
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
-    //       
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+    //
+    // 
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
     //     } else {
     //       databaseEstimateSize = 4;
     //       insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index f1805cc..fcb577d 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -19,16 +19,6 @@
 
 package org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-
 import 
org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.utils.PipeMemoryWeightUtil;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.utils.builder.PipeTableModeTsFileBuilder;
@@ -37,12 +27,23 @@ import 
org.apache.iotdb.collector.plugin.builtin.sink.utils.builder.PipeTsFileBu
 import 
org.apache.iotdb.collector.plugin.builtin.sink.utils.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.collector.plugin.builtin.sink.utils.sorter.PipeTreeModelTabletEventSorter;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class);
@@ -92,7 +93,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     //     }
     //   }
     // } else
-      if (event instanceof PipeRawTabletInsertionEvent) {
+    if (event instanceof PipeRawTabletInsertionEvent) {
       final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
           (PipeRawTabletInsertionEvent) event;
       final Tablet tablet = rawTabletInsertionEvent.convertToTablet();


Reply via email to