This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch sink in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
commit 22df722e36d5f31b84a219f77b68a729f60608b7 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Mar 12 11:58:51 2025 +0800 fix --- .../iotdb/collector/config/IoTDBCommonOptions.java | 4 +-- .../plugin/builtin/annotation/TableModel.java | 39 ---------------------- .../plugin/builtin/annotation/TreeModel.java | 39 ---------------------- .../sink/compressor/PipeCompressorFactory.java | 4 +-- .../sink/compressor/PipeGZIPCompressor.java | 3 +- .../builtin/sink/compressor/PipeLZ4Compressor.java | 3 +- .../sink/compressor/PipeLZMA2Compressor.java | 3 +- .../sink/compressor/PipeSnappyCompressor.java | 3 +- .../sink/compressor/PipeZSTDCompressor.java | 1 + .../sink/constant/ColumnHeaderConstant.java | 3 +- .../sink/constant/PipeConnectorConstant.java | 3 +- .../sink/event/row/PipeDataTypeTransformer.java | 1 + .../plugin/builtin/sink/event/row/PipeRow.java | 1 + .../builtin/sink/event/row/PipeRowCollector.java | 5 ++- .../event/tablet/PipeRawTabletInsertionEvent.java | 1 + .../event/tsfile/PipeTsFileInsertionEvent.java | 33 +++++++++--------- ...meConnectorRetryTimesConfigurableException.java | 1 - .../pipe/PipeRuntimeCriticalException.java | 1 - .../exception/pipe/PipeRuntimeExceptionType.java | 1 - .../pipe/PipeRuntimeNonCriticalException.java | 1 - .../PipeRuntimeOutOfMemoryCriticalException.java | 1 - .../builtin/sink/limiter/GlobalRateLimiter.java | 5 +-- .../sink/limiter/PipeEndPointRateLimiter.java | 10 +++--- .../evolvable/batch/PipeTabletEventBatch.java | 21 ++++++------ .../evolvable/batch/PipeTabletEventPlainBatch.java | 29 ++++++++-------- .../batch/PipeTabletEventTsFileBatch.java | 23 +++++++------ 26 files changed, 83 insertions(+), 156 deletions(-) diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java index 4b25278..ac2304f 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/IoTDBCommonOptions.java @@ -19,6 +19,4 @@ package org.apache.iotdb.collector.config; -public class IoTDBCommonOptions extends Options{ - -} +public class IoTDBCommonOptions extends Options {} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TableModel.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TableModel.java deleted file mode 100644 index f73a1af..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TableModel.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.plugin.builtin.annotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Indicates that a plugin can be used in table model environments. - * - * <p>When implementing a custom {@link org.apache.iotdb.pipe.api.PipePlugin} that needs to operate - * under table model settings, declare this annotation on the plugin class. Through the {@code - * CREATE PIPEPLUGIN} statement, a plugin annotated with {@link TableModel} is valid for both tree - * model connections and table model connections. - * - * @since 2.0.0 - */ -@Target(ElementType.TYPE) -@Retention(RetentionPolicy.RUNTIME) -public @interface TableModel {} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TreeModel.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TreeModel.java deleted file mode 100644 index 7e163fe..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/annotation/TreeModel.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.plugin.builtin.annotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Indicates that a plugin can be used in tree model environments. - * - * <p>When implementing a custom {@link org.apache.iotdb.pipe.api.PipePlugin} that needs to operate - * under tree model settings, declare this annotation on the plugin class. Through the {@code CREATE - * PIPEPLUGIN} statement, a plugin annotated with {@link TreeModel} is valid for both tree model - * connections and tree model connections. - * - * @since 2.0.0 - */ -@Target(ElementType.TYPE) -@Retention(RetentionPolicy.RUNTIME) -public @interface TreeModel {} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java index df941f4..e2da42f 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeCompressorFactory.java @@ -19,13 +19,13 @@ package org.apache.iotdb.collector.plugin.builtin.sink.compressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.iotdb.collector.plugin.builtin.sink.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP; import static org.apache.iotdb.collector.plugin.builtin.sink.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java index 155af20..38c844e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeGZIPCompressor.java @@ -19,11 +19,12 @@ package org.apache.iotdb.collector.plugin.builtin.sink.compressor; -import java.io.IOException; import org.apache.tsfile.compress.ICompressor; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.file.metadata.enums.CompressionType; +import java.io.IOException; + public class PipeGZIPCompressor extends PipeCompressor { private static final ICompressor COMPRESSOR = ICompressor.getCompressor(CompressionType.GZIP); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java index 2c0a529..9f441d8 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZ4Compressor.java @@ -19,11 +19,12 @@ package org.apache.iotdb.collector.plugin.builtin.sink.compressor; -import java.io.IOException; import org.apache.tsfile.compress.ICompressor; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.file.metadata.enums.CompressionType; +import java.io.IOException; + public class PipeLZ4Compressor extends PipeCompressor { private static final ICompressor COMPRESSOR = ICompressor.getCompressor(CompressionType.LZ4); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java index e964c95..6910579 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeLZMA2Compressor.java @@ -19,11 +19,12 @@ package org.apache.iotdb.collector.plugin.builtin.sink.compressor; -import java.io.IOException; import org.apache.tsfile.compress.ICompressor; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.file.metadata.enums.CompressionType; +import java.io.IOException; + public class PipeLZMA2Compressor extends PipeCompressor { private static final ICompressor COMPRESSOR = ICompressor.getCompressor(CompressionType.LZMA2); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java index 7e84db5..996f95d 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeSnappyCompressor.java @@ -19,11 +19,12 @@ package org.apache.iotdb.collector.plugin.builtin.sink.compressor; -import java.io.IOException; import org.apache.tsfile.compress.ICompressor; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.file.metadata.enums.CompressionType; +import java.io.IOException; + public class PipeSnappyCompressor extends PipeCompressor { private static final ICompressor COMPRESSOR = ICompressor.getCompressor(CompressionType.SNAPPY); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java index 2568770..67e40ac 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/compressor/PipeZSTDCompressor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.collector.plugin.builtin.sink.compressor; import com.github.luben.zstd.Zstd; + import java.io.IOException; public class PipeZSTDCompressor extends PipeCompressor { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java index 23a4b42..4baa3ed 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/ColumnHeaderConstant.java @@ -19,7 +19,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.constant; - public class ColumnHeaderConstant { private ColumnHeaderConstant() { @@ -272,7 +271,7 @@ public class ColumnHeaderConstant { public static final String CURRENT_DATABASE = "CurrentDatabase"; public static final String CURRENT_SQL_DIALECT = "CurrentSqlDialect"; -/* + /* public static final List<ColumnHeader> lastQueryColumnHeaders = ImmutableList.of( new ColumnHeader(TIMESERIES, TSDataType.TEXT), diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java index 706a2ae..08182eb 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/constant/PipeConnectorConstant.java @@ -19,9 +19,10 @@ package org.apache.iotdb.collector.plugin.builtin.sink.constant; -import com.github.luben.zstd.Zstd; import org.apache.iotdb.collector.config.PipeOptions; +import com.github.luben.zstd.Zstd; + import java.io.File; import java.util.Arrays; import java.util.Collections; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java index a4ee481..85883ff 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeDataTypeTransformer.java @@ -20,6 +20,7 @@ package org.apache.iotdb.collector.plugin.builtin.sink.event.row; import org.apache.iotdb.pipe.api.type.Type; + import org.apache.tsfile.enums.TSDataType; import java.util.List; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java index 84d074e..ad123a9 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRow.java @@ -23,6 +23,7 @@ import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import org.apache.iotdb.pipe.api.type.Binary; import org.apache.iotdb.pipe.api.type.Type; + import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.Path; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java index 38d8258..73e86ef 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/row/PipeRowCollector.java @@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.pipe.api.type.Binary; + import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -56,9 +57,7 @@ public class PipeRowCollector implements RowCollector { } public PipeRowCollector( - PipeRawTabletInsertionEvent sourceEvent, - String sourceEventDataBase, - Boolean isTableModel) { + PipeRawTabletInsertionEvent sourceEvent, String sourceEventDataBase, Boolean isTableModel) { this.sourceEvent = sourceEvent; this.sourceEventDataBaseName = sourceEventDataBase; this.isTableModel = isTableModel; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java index f249143..b7de5b3 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tablet/PipeRawTabletInsertionEvent.java @@ -28,6 +28,7 @@ import org.apache.iotdb.collector.plugin.builtin.sink.resource.ref.PipePhantomRe import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.record.Tablet; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java index e383284..fcc64c2 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/event/tsfile/PipeTsFileInsertionEvent.java @@ -19,19 +19,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.event.tsfile; -import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; -import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.iotdb.collector.plugin.builtin.sink.datastructure.pattern.TablePattern; import org.apache.iotdb.collector.plugin.builtin.sink.datastructure.pattern.TreePattern; import org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent; @@ -42,11 +29,25 @@ import org.apache.iotdb.collector.plugin.builtin.sink.resource.ref.PipePhantomRe import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; + import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.PlainDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; +import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + public class PipeTsFileInsertionEvent extends PipeInsertionEvent implements TsFileInsertionEvent, ReferenceTrackableEvent { @@ -260,9 +261,11 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { try { - // tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, resource); + // tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, + // resource); // if (isWithMod) { - // modFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, null); + // modFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, + // null); // } return true; } catch (final Exception e) { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java index 8d215d8..786be00 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java @@ -19,7 +19,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe; - public class PipeRuntimeConnectorRetryTimesConfigurableException extends PipeRuntimeConnectorCriticalException { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java index fda9c61..4fe301a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeCriticalException.java @@ -22,7 +22,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java index 9f39441..fdc38c8 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeExceptionType.java @@ -22,7 +22,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java index a539d2e..92ebff1 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeNonCriticalException.java @@ -22,7 +22,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java index 82cc89b..bca4676 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/exception/pipe/PipeRuntimeOutOfMemoryCriticalException.java @@ -22,7 +22,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.exception.pipe; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java index fbd7062..8fc6b39 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/GlobalRateLimiter.java @@ -19,11 +19,12 @@ package org.apache.iotdb.collector.plugin.builtin.sink.limiter; +import org.apache.iotdb.collector.config.PipeOptions; + import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.RateLimiter; -import java.util.concurrent.TimeUnit; -import org.apache.iotdb.collector.config.PipeOptions; +import java.util.concurrent.TimeUnit; /** This is a global rate limiter for all connectors. */ public class GlobalRateLimiter { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java index 39ac4ba..d1d526a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/limiter/PipeEndPointRateLimiter.java @@ -19,14 +19,15 @@ package org.apache.iotdb.collector.plugin.builtin.sink.limiter; +import org.apache.iotdb.collector.config.PipeOptions; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; + import com.google.common.util.concurrent.RateLimiter; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.apache.iotdb.collector.config.PipeOptions; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; - public class PipeEndPointRateLimiter { // The task agent is used to check if the pipe is still alive @@ -72,8 +73,7 @@ public class PipeEndPointRateLimiter { while (!rateLimiter.tryAcquire( bytes, PipeOptions.RATE_LIMITER_HOT_RELOAD_CHECK_INTERVAL_MS.value(), - TimeUnit.MILLISECONDS)) { - } + TimeUnit.MILLISECONDS)) {} return true; } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 716d119..3d8d477 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -19,18 +19,18 @@ package org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - import org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent; -import org.apache.iotdb.collector.plugin.builtin.sink.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + public abstract class PipeTabletEventBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class); @@ -62,7 +62,8 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { // .setExpandCallback( // (oldMemory, newMemory) -> // LOGGER.info( - // "The batch size limit has expanded from {} to {}.", oldMemory, newMemory)); + // "The batch size limit has expanded from {} to {}.", oldMemory, + // newMemory)); if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) { LOGGER.info( @@ -79,8 +80,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { * @param event the given {@link Event} * @return {@code true} if the batch can be transferred */ - public synchronized boolean onEvent(final TabletInsertionEvent event) - throws IOException { + public synchronized boolean onEvent(final TabletInsertionEvent event) throws IOException { if (isClosed || !(event instanceof PipeRawTabletInsertionEvent)) { return false; } @@ -108,8 +108,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { * cached and not emitted in this batch. If there are failure encountered, just throw * exceptions and do not return {@code false} here. */ - protected abstract boolean constructBatch(final TabletInsertionEvent event) - throws IOException; + protected abstract boolean constructBatch(final TabletInsertionEvent event) throws IOException; public boolean shouldEmit() { return totalBufferSize >= getMaxBatchSizeInBytes() diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index 03e536b..ad9a1ad 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -19,23 +19,24 @@ package org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventPlainBatch.class); @@ -57,13 +58,13 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { } @Override - protected boolean constructBatch(final TabletInsertionEvent event) - throws IOException { + protected boolean constructBatch(final TabletInsertionEvent event) throws IOException { final int bufferSize = buildTabletInsertionBuffer(event); totalBufferSize += bufferSize; pipe2BytesAccumulated.compute( new Pair<>( - ((PipeRawTabletInsertionEvent) event).getPipeName(), ((PipeRawTabletInsertionEvent) event).getCreationTime()), + ((PipeRawTabletInsertionEvent) event).getPipeName(), + ((PipeRawTabletInsertionEvent) event).getCreationTime()), (pipeName, bytesAccumulated) -> bytesAccumulated == null ? bufferSize : bytesAccumulated + bufferSize); return true; @@ -102,8 +103,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { return pipe2BytesAccumulated; } - private int buildTabletInsertionBuffer(final TabletInsertionEvent event) - throws IOException { + private int buildTabletInsertionBuffer(final TabletInsertionEvent event) throws IOException { int databaseEstimateSize = 0; final ByteBuffer buffer; // if (event instanceof PipeInsertNodeTabletInsertionEvent) { @@ -130,7 +130,8 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { // if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { // databaseEstimateSize = // pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length(); - // insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); + // + // insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); // } else { // databaseEstimateSize = 4; // insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index f1805cc..fcb577d 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -19,16 +19,6 @@ package org.apache.iotdb.collector.plugin.builtin.sink.payload.evolvable.batch; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.iotdb.collector.plugin.builtin.sink.event.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.collector.plugin.builtin.sink.utils.PipeMemoryWeightUtil; import org.apache.iotdb.collector.plugin.builtin.sink.utils.builder.PipeTableModeTsFileBuilder; @@ -37,12 +27,23 @@ import org.apache.iotdb.collector.plugin.builtin.sink.utils.builder.PipeTsFileBu import org.apache.iotdb.collector.plugin.builtin.sink.utils.sorter.PipeTableModelTabletEventSorter; import org.apache.iotdb.collector.plugin.builtin.sink.utils.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class); @@ -92,7 +93,7 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { // } // } // } else - if (event instanceof PipeRawTabletInsertionEvent) { + if (event instanceof PipeRawTabletInsertionEvent) { final PipeRawTabletInsertionEvent rawTabletInsertionEvent = (PipeRawTabletInsertionEvent) event; final Tablet tablet = rawTabletInsertionEvent.convertToTablet();
