This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-table-model-3 by this 
push:
     new c969186fcbd Update IoTDBDataRegionTableModelExtractor.java
c969186fcbd is described below

commit c969186fcbd2a477567a7a63b3de20e022586343
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 26 18:49:20 2024 +0800

    Update IoTDBDataRegionTableModelExtractor.java
---
 .../IoTDBDataRegionTableModelExtractor.java        | 234 ---------------------
 1 file changed, 234 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
index fa23d12a2e5..e1dd86ab9ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
@@ -19,33 +19,21 @@
 
 package org.apache.iotdb.db.pipe.table.extractor.dataregion;
 
-import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionWatermarkInjector;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHeartbeatExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
-import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
-import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
-import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.utils.Pair;
@@ -53,8 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
@@ -82,8 +68,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_EXCLUSION_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
@@ -95,7 +79,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
 import static 
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.hasAtLeastOneOption;
 import static 
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.optionsAreAllLegal;
 
@@ -342,228 +325,11 @@ public class IoTDBDataRegionTableModelExtractor extends 
IoTDBDataRegionExtractor
     }
   }
 
-  @Override
-  public void customize(
-      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
-      throws Exception {
-    if (hasNoExtractionNeed) {
-      return;
-    }
-
-    final PipeTaskExtractorRuntimeEnvironment environment =
-        ((PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment());
-    regionId = environment.getRegionId();
-    pipeName = environment.getPipeName();
-    creationTime = environment.getCreationTime();
-    taskID = pipeName + "_" + regionId + "_" + creationTime;
-    pipeTaskMeta = environment.getPipeTaskMeta();
-
-    isForwardingPipeRequests =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(
-                PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
-                PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-            
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
-
-    historicalExtractor.customize(parameters, configuration);
-    realtimeExtractor.customize(parameters, configuration);
-
-    // Set watermark injector
-    if (parameters.hasAnyAttributes(
-        EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
-      final long watermarkIntervalInMs =
-          parameters.getLongOrDefault(
-              Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, 
SOURCE_WATERMARK_INTERVAL_KEY),
-              EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
-      if (watermarkIntervalInMs > 0) {
-        watermarkInjector = new DataRegionWatermarkInjector(regionId, 
watermarkIntervalInMs);
-        LOGGER.info(
-            "Pipe {}@{}: Set watermark injector with interval {} ms.",
-            pipeName,
-            regionId,
-            watermarkInjector.getInjectionIntervalInMs());
-      }
-    }
-
-    // register metric after generating taskID
-    PipeDataRegionExtractorMetrics.getInstance().register(this);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
-  }
-
-  @Override
-  public void start() throws Exception {
-    if (hasNoExtractionNeed || hasBeenStarted.get()) {
-      return;
-    }
-
-    final long startTime = System.currentTimeMillis();
-    LOGGER.info(
-        "Pipe {}@{}: Starting historical extractor {} and realtime extractor 
{}.",
-        pipeName,
-        regionId,
-        historicalExtractor.getClass().getSimpleName(),
-        realtimeExtractor.getClass().getSimpleName());
-
-    if (hasBeenStarted.get()) {
-      return;
-    }
-    hasBeenStarted.set(true);
-
-    final AtomicReference<Exception> exceptionHolder = new 
AtomicReference<>(null);
-    final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
-    while (true) {
-      // try to start extractors in the data region ...
-      // first try to run if data region exists, then try to run if data 
region does not exist.
-      // both conditions fail is not common, which means the data region is 
created during the
-      // runIfPresent and runIfAbsent operations. in this case, we need to 
retry.
-      if (StorageEngine.getInstance()
-              .runIfPresent(
-                  dataRegionIdObject,
-                  (dataRegion -> {
-                    dataRegion.writeLock(
-                        String.format(
-                            "Pipe: starting %s", 
IoTDBDataRegionExtractor.class.getName()));
-                    try {
-                      
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
-                    } finally {
-                      dataRegion.writeUnlock();
-                    }
-                  }))
-          || StorageEngine.getInstance()
-              .runIfAbsent(
-                  dataRegionIdObject,
-                  () -> 
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
-        rethrowExceptionIfAny(exceptionHolder);
-
-        LOGGER.info(
-            "Pipe {}@{}: Started historical extractor {} and realtime 
extractor {} successfully within {} ms.",
-            pipeName,
-            regionId,
-            historicalExtractor.getClass().getSimpleName(),
-            realtimeExtractor.getClass().getSimpleName(),
-            System.currentTimeMillis() - startTime);
-        return;
-      }
-      rethrowExceptionIfAny(exceptionHolder);
-    }
-  }
-
-  private void startHistoricalExtractorAndRealtimeExtractor(
-      final AtomicReference<Exception> exceptionHolder) {
-    try {
-      // Start realtimeExtractor first to avoid losing data. This may cause 
some
-      // retransmission, yet it is OK according to the idempotency of IoTDB.
-      // Note: The order of historical collection is flushing data -> adding 
all tsFile events.
-      // There can still be writing when tsFile events are added. If we start
-      // realtimeExtractor after the process, then this part of data will be 
lost.
-      realtimeExtractor.start();
-      historicalExtractor.start();
-    } catch (final Exception e) {
-      exceptionHolder.set(e);
-      LOGGER.warn(
-          "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
-          pipeName,
-          regionId,
-          historicalExtractor.getClass().getSimpleName(),
-          realtimeExtractor.getClass().getSimpleName(),
-          e);
-    }
-  }
-
-  private void rethrowExceptionIfAny(final AtomicReference<Exception> 
exceptionHolder) {
-    if (exceptionHolder.get() != null) {
-      throw new PipeException("failed to start extractors.", 
exceptionHolder.get());
-    }
-  }
-
-  @Override
-  public Event supply() throws Exception {
-    if (hasNoExtractionNeed) {
-      return null;
-    }
-
-    Event event = null;
-    if (!historicalExtractor.hasConsumedAll()) {
-      event = historicalExtractor.supply();
-    } else {
-      if (Objects.nonNull(watermarkInjector)) {
-        event = watermarkInjector.inject();
-      }
-      if (Objects.isNull(event)) {
-        event = realtimeExtractor.supply();
-      }
-    }
-
-    if (Objects.nonNull(event)) {
-      if (event instanceof TabletInsertionEvent) {
-        PipeDataRegionExtractorMetrics.getInstance().markTabletEvent(taskID);
-      } else if (event instanceof TsFileInsertionEvent) {
-        PipeDataRegionExtractorMetrics.getInstance().markTsFileEvent(taskID);
-      } else if (event instanceof PipeHeartbeatEvent) {
-        
PipeDataRegionExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
-      }
-    }
-
-    return event;
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (hasNoExtractionNeed || !hasBeenStarted.get()) {
-      return;
-    }
-
-    historicalExtractor.close();
-    realtimeExtractor.close();
-    if (Objects.nonNull(taskID)) {
-      PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
-    }
-  }
-
   //////////////////////////// APIs provided for detecting stuck 
////////////////////////////
 
-  @Override
-  public boolean shouldExtractInsertion() {
-    return shouldExtractInsertion;
-  }
-
   @Override
   public boolean isStreamMode() {
     return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
         || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
   }
-
-  @Override
-  public boolean hasConsumedAllHistoricalTsFiles() {
-    return historicalExtractor.hasConsumedAll();
-  }
-
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
-
-  @Override
-  public int getHistoricalTsFileInsertionEventCount() {
-    return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() : 
0;
-  }
-
-  @Override
-  public int getTabletInsertionEventCount() {
-    return hasBeenStarted.get() ? 
realtimeExtractor.getTabletInsertionEventCount() : 0;
-  }
-
-  @Override
-  public int getRealtimeTsFileInsertionEventCount() {
-    return hasBeenStarted.get() ? 
realtimeExtractor.getTsFileInsertionEventCount() : 0;
-  }
-
-  @Override
-  public int getPipeHeartbeatEventCount() {
-    return hasBeenStarted.get() ? 
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
-  }
-
-  @Override
-  public int getEventCount() {
-    return hasBeenStarted.get()
-        ? (historicalExtractor.getPendingQueueSize() + 
realtimeExtractor.getEventCount())
-        : 0;
-  }
 }

Reply via email to