This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-table-model-3 by this
push:
new c969186fcbd Update IoTDBDataRegionTableModelExtractor.java
c969186fcbd is described below
commit c969186fcbd2a477567a7a63b3de20e022586343
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 26 18:49:20 2024 +0800
Update IoTDBDataRegionTableModelExtractor.java
---
.../IoTDBDataRegionTableModelExtractor.java | 234 ---------------------
1 file changed, 234 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
index fa23d12a2e5..e1dd86ab9ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
@@ -19,33 +19,21 @@
package org.apache.iotdb.db.pipe.table.extractor.dataregion;
-import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionWatermarkInjector;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHeartbeatExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
-import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
-import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
-import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.utils.Pair;
@@ -53,8 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
@@ -82,8 +68,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
@@ -95,7 +79,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
import static
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.hasAtLeastOneOption;
import static
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.optionsAreAllLegal;
@@ -342,228 +325,11 @@ public class IoTDBDataRegionTableModelExtractor extends
IoTDBDataRegionExtractor
}
}
- @Override
- public void customize(
- final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
- throws Exception {
- if (hasNoExtractionNeed) {
- return;
- }
-
- final PipeTaskExtractorRuntimeEnvironment environment =
- ((PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment());
- regionId = environment.getRegionId();
- pipeName = environment.getPipeName();
- creationTime = environment.getCreationTime();
- taskID = pipeName + "_" + regionId + "_" + creationTime;
- pipeTaskMeta = environment.getPipeTaskMeta();
-
- isForwardingPipeRequests =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
-
- historicalExtractor.customize(parameters, configuration);
- realtimeExtractor.customize(parameters, configuration);
-
- // Set watermark injector
- if (parameters.hasAnyAttributes(
- EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
- final long watermarkIntervalInMs =
- parameters.getLongOrDefault(
- Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY,
SOURCE_WATERMARK_INTERVAL_KEY),
- EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
- if (watermarkIntervalInMs > 0) {
- watermarkInjector = new DataRegionWatermarkInjector(regionId,
watermarkIntervalInMs);
- LOGGER.info(
- "Pipe {}@{}: Set watermark injector with interval {} ms.",
- pipeName,
- regionId,
- watermarkInjector.getInjectionIntervalInMs());
- }
- }
-
- // register metric after generating taskID
- PipeDataRegionExtractorMetrics.getInstance().register(this);
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
- }
-
- @Override
- public void start() throws Exception {
- if (hasNoExtractionNeed || hasBeenStarted.get()) {
- return;
- }
-
- final long startTime = System.currentTimeMillis();
- LOGGER.info(
- "Pipe {}@{}: Starting historical extractor {} and realtime extractor
{}.",
- pipeName,
- regionId,
- historicalExtractor.getClass().getSimpleName(),
- realtimeExtractor.getClass().getSimpleName());
-
- if (hasBeenStarted.get()) {
- return;
- }
- hasBeenStarted.set(true);
-
- final AtomicReference<Exception> exceptionHolder = new
AtomicReference<>(null);
- final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
- while (true) {
- // try to start extractors in the data region ...
- // first try to run if data region exists, then try to run if data
region does not exist.
- // both conditions fail is not common, which means the data region is
created during the
- // runIfPresent and runIfAbsent operations. in this case, we need to
retry.
- if (StorageEngine.getInstance()
- .runIfPresent(
- dataRegionIdObject,
- (dataRegion -> {
- dataRegion.writeLock(
- String.format(
- "Pipe: starting %s",
IoTDBDataRegionExtractor.class.getName()));
- try {
-
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
- } finally {
- dataRegion.writeUnlock();
- }
- }))
- || StorageEngine.getInstance()
- .runIfAbsent(
- dataRegionIdObject,
- () ->
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
- rethrowExceptionIfAny(exceptionHolder);
-
- LOGGER.info(
- "Pipe {}@{}: Started historical extractor {} and realtime
extractor {} successfully within {} ms.",
- pipeName,
- regionId,
- historicalExtractor.getClass().getSimpleName(),
- realtimeExtractor.getClass().getSimpleName(),
- System.currentTimeMillis() - startTime);
- return;
- }
- rethrowExceptionIfAny(exceptionHolder);
- }
- }
-
- private void startHistoricalExtractorAndRealtimeExtractor(
- final AtomicReference<Exception> exceptionHolder) {
- try {
- // Start realtimeExtractor first to avoid losing data. This may cause
some
- // retransmission, yet it is OK according to the idempotency of IoTDB.
- // Note: The order of historical collection is flushing data -> adding
all tsFile events.
- // There can still be writing when tsFile events are added. If we start
- // realtimeExtractor after the process, then this part of data will be
lost.
- realtimeExtractor.start();
- historicalExtractor.start();
- } catch (final Exception e) {
- exceptionHolder.set(e);
- LOGGER.warn(
- "Pipe {}@{}: Start historical extractor {} and realtime extractor {}
error.",
- pipeName,
- regionId,
- historicalExtractor.getClass().getSimpleName(),
- realtimeExtractor.getClass().getSimpleName(),
- e);
- }
- }
-
- private void rethrowExceptionIfAny(final AtomicReference<Exception>
exceptionHolder) {
- if (exceptionHolder.get() != null) {
- throw new PipeException("failed to start extractors.",
exceptionHolder.get());
- }
- }
-
- @Override
- public Event supply() throws Exception {
- if (hasNoExtractionNeed) {
- return null;
- }
-
- Event event = null;
- if (!historicalExtractor.hasConsumedAll()) {
- event = historicalExtractor.supply();
- } else {
- if (Objects.nonNull(watermarkInjector)) {
- event = watermarkInjector.inject();
- }
- if (Objects.isNull(event)) {
- event = realtimeExtractor.supply();
- }
- }
-
- if (Objects.nonNull(event)) {
- if (event instanceof TabletInsertionEvent) {
- PipeDataRegionExtractorMetrics.getInstance().markTabletEvent(taskID);
- } else if (event instanceof TsFileInsertionEvent) {
- PipeDataRegionExtractorMetrics.getInstance().markTsFileEvent(taskID);
- } else if (event instanceof PipeHeartbeatEvent) {
-
PipeDataRegionExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
- }
- }
-
- return event;
- }
-
- @Override
- public void close() throws Exception {
- if (hasNoExtractionNeed || !hasBeenStarted.get()) {
- return;
- }
-
- historicalExtractor.close();
- realtimeExtractor.close();
- if (Objects.nonNull(taskID)) {
- PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
- }
- }
-
//////////////////////////// APIs provided for detecting stuck
////////////////////////////
- @Override
- public boolean shouldExtractInsertion() {
- return shouldExtractInsertion;
- }
-
@Override
public boolean isStreamMode() {
return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
|| realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
}
-
- @Override
- public boolean hasConsumedAllHistoricalTsFiles() {
- return historicalExtractor.hasConsumedAll();
- }
-
- //////////////////////////// APIs provided for metric framework
////////////////////////////
-
- @Override
- public int getHistoricalTsFileInsertionEventCount() {
- return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() :
0;
- }
-
- @Override
- public int getTabletInsertionEventCount() {
- return hasBeenStarted.get() ?
realtimeExtractor.getTabletInsertionEventCount() : 0;
- }
-
- @Override
- public int getRealtimeTsFileInsertionEventCount() {
- return hasBeenStarted.get() ?
realtimeExtractor.getTsFileInsertionEventCount() : 0;
- }
-
- @Override
- public int getPipeHeartbeatEventCount() {
- return hasBeenStarted.get() ?
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
- }
-
- @Override
- public int getEventCount() {
- return hasBeenStarted.get()
- ? (historicalExtractor.getPendingQueueSize() +
realtimeExtractor.getEventCount())
- : 0;
- }
}