This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b28d38aeb Load: Do not auto create series that are completely 
deleted by mods (#13182)
39b28d38aeb is described below

commit 39b28d38aeb718c16e9057c801990ef1ff5be2f7
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Aug 27 20:41:53 2024 +0800

    Load: Do not auto create series that are completely deleted by mods (#13182)
    
    Considers modification file in the analyze stage of Load, and skip 
time-series that are completely deleted by mods during auto creation.
    
    The implementation uses the statistics in TsFileResource.timeIndex (if 
present) and TimeseriesMetadata to judge if a device or time-series is 
completely deleted.
---
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |   1 -
 .../pipe/it/autocreate/IoTDBPipeWithLoadIT.java    | 138 +++++++++++++++++++++
 .../plan/analyze/LoadTsFileAnalyzer.java           |  89 +++++++++++++
 .../apache/iotdb/db/utils/ModificationUtils.java   |  40 ++++++
 4 files changed, 267 insertions(+), 1 deletion(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 115ba4bfc90..abd745aa26b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -761,7 +761,6 @@ public class IoTDBLoadTsFileIT {
       generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL 
/ 10_000, false);
       generator.generateData(SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL 
/ 10_000, true);
       generator.generateDeletion(SchemaConfig.DEVICE_0, 10);
-      generator.generateDeletion(SchemaConfig.DEVICE_1, 10);
       writtenPoint1 = generator.getTotalNumber();
     }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
new file mode 100644
index 00000000000..45e40331b38
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeWithLoadIT extends AbstractPipeDualAutoIT {
+
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+
+    // TODO: delete ratis configurations
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        // Disable sender compaction to test mods
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
+    // 10 min, assert that the operations will not time out
+    senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+    receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
+  /**
+   * Test that when the receiver loads data from TsFile, it will not load 
timeseries that are
+   * completed deleted by mods.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testReceiverNotLoadDeletedTimeseries() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    final Map<String, String> extractorAttributes = new HashMap<>();
+    final Map<String, String> processorAttributes = new HashMap<>();
+    final Map<String, String> connectorAttributes = new HashMap<>();
+
+    // Enable mods transfer
+    extractorAttributes.put("source.mods.enable", "true");
+
+    connectorAttributes.put("connector.batch.enable", "false");
+    connectorAttributes.put("connector.ip", receiverIp);
+    connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      // Generate TsFile and mods on sender. There are 6 time-series in total.
+      // Time-series not affected by mods: d1.s1, d2.s1
+      // Time-series partially deleted by mods: d1.s2, d3.s1
+      // Time-series completely deleted by mods: d1.s3, d4.s1 (should not be 
loaded by receiver)
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1 (time, s1, s2, s3) values (1, 1, 1, 1), 
(3, 3, 3, 3)",
+              "insert into root.db.d2 (time, s1) values (1, 1), (3, 3)",
+              "insert into root.db.d3 (time, s1) values (1, 1), (3, 3)",
+              "insert into root.db.d4 (time, s1) values (1, 1), (3, 3)",
+              "flush",
+              "delete from root.db.d1.s2 where time <= 2",
+              "delete from root.db.d1.s3 where time >= 1 and time <= 3",
+              "delete from root.db.d3.** where time <= 2",
+              "delete from root.db.d4.** where time >= 1 and time <= 3",
+              "flush"))) {
+        return;
+      }
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, "count timeseries", "count(timeseries),", 
Collections.singleton("4,"));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 4e2a0240d35..34f99c64388 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -57,11 +57,16 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
 import 
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -88,6 +93,7 @@ import java.io.IOException;
 import java.nio.BufferUnderflowException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -244,6 +250,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
         tsFileResource.deserialize();
       }
 
+      
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
+
       // check if the tsfile is empty
       if (!timeseriesMetadataIterator.hasNext()) {
         throw new LoadEmptyFileException(tsFile.getAbsolutePath());
@@ -309,6 +317,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       this.schemaCache = new LoadTsFileAnalyzeSchemaCache();
     }
 
+    public void setCurrentModificationsAndTimeIndex(TsFileResource resource) 
throws IOException {
+      schemaCache.setCurrentModificationsAndTimeIndex(resource);
+    }
+
     public void autoCreateAndVerify(
         TsFileSequenceReader reader,
         Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
@@ -317,7 +329,30 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
           device2TimeseriesMetadataList.entrySet()) {
         final IDeviceID device = entry.getKey();
 
+        try {
+          if (schemaCache.isDeviceDeletedByMods(device)) {
+            continue;
+          }
+        } catch (IllegalPathException e) {
+          LOGGER.warn(
+              "Failed to check if device {} is deleted by mods. Will see it as 
not deleted.",
+              device,
+              e);
+        }
+
         for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+          try {
+            if (schemaCache.isTimeseriesDeletedByMods(device, 
timeseriesMetadata)) {
+              continue;
+            }
+          } catch (IllegalPathException e) {
+            LOGGER.warn(
+                "Failed to check if device {}, timeseries {} is deleted by 
mods. Will see it as not deleted.",
+                device,
+                timeseriesMetadata.getMeasurementId(),
+                e);
+          }
+
           final TSDataType dataType = timeseriesMetadata.getTsDataType();
           if (TSDataType.VECTOR.equals(dataType)) {
             schemaCache
@@ -681,9 +716,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
     private Set<PartialPath> alreadySetDatabases;
 
+    private Collection<Modification> currentModifications;
+    private ITimeIndex currentTimeIndex;
+
     private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
     private long tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0;
     private long alreadySetDatabasesMemoryUsageSizeInBytes = 0;
+    private long currentModificationsMemoryUsageSizeInBytes = 0;
+    private long currentTimeIndexMemoryUsageSizeInBytes = 0;
 
     private int currentBatchTimeSeriesCount = 0;
 
@@ -694,6 +734,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
       this.tsFileDevice2IsAligned = new HashMap<>();
       this.alreadySetDatabases = new HashSet<>();
+      this.currentModifications = new ArrayList<>();
     }
 
     public Map<IDeviceID, Set<MeasurementSchema>> getDevice2TimeSeries() {
@@ -749,6 +790,44 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       }
     }
 
+    public void setCurrentModificationsAndTimeIndex(TsFileResource resource) 
throws IOException {
+      clearModificationsAndTimeIndex();
+
+      currentModifications = resource.getModFile().getModifications();
+      for (final Modification modification : currentModifications) {
+        currentModificationsMemoryUsageSizeInBytes += ((Deletion) 
modification).getSerializedSize();
+      }
+      block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
+
+      if (resource.resourceFileExists()) {
+        currentTimeIndex = resource.getTimeIndex();
+        if (currentTimeIndex instanceof FileTimeIndex) {
+          currentTimeIndex = resource.buildDeviceTimeIndex();
+        }
+        currentTimeIndexMemoryUsageSizeInBytes = 
currentTimeIndex.calculateRamSize();
+        block.addMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
+      }
+    }
+
+    public boolean isDeviceDeletedByMods(IDeviceID device) throws 
IllegalPathException {
+      return currentTimeIndex != null
+          && ModificationUtils.isDeviceDeletedByMods(
+              currentModifications,
+              device,
+              currentTimeIndex.getStartTime(device),
+              currentTimeIndex.getEndTime(device));
+    }
+
+    public boolean isTimeseriesDeletedByMods(
+        IDeviceID device, TimeseriesMetadata timeseriesMetadata) throws 
IllegalPathException {
+      return ModificationUtils.isTimeseriesDeletedByMods(
+          currentModifications,
+          device,
+          timeseriesMetadata.getMeasurementId(),
+          timeseriesMetadata.getStatistics().getStartTime(),
+          timeseriesMetadata.getStatistics().getEndTime());
+    }
+
     public void addAlreadySetDatabase(PartialPath database) {
       long memoryUsageSizeInBytes = 0;
       if (alreadySetDatabases.add(database)) {
@@ -778,6 +857,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       currentBatchTimeSeriesCount = 0;
     }
 
+    public void clearModificationsAndTimeIndex() {
+      currentModifications.clear();
+      currentTimeIndex = null;
+      block.reduceMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
+      block.reduceMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
+      currentModificationsMemoryUsageSizeInBytes = 0;
+      currentTimeIndexMemoryUsageSizeInBytes = 0;
+    }
+
     public void clearAlignedCache() {
       tsFileDevice2IsAligned.clear();
       block.reduceMemoryUsage(tsFileDevice2IsAlignedMemoryUsageSizeInBytes);
@@ -815,6 +903,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
     public void close() {
       clearTimeSeries();
+      clearModificationsAndTimeIndex();
       clearAlignedCache();
       clearDatabasesCache();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index 934549dd062..9e7b6f7ecb8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -30,6 +35,7 @@ import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.Pair;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 public class ModificationUtils {
@@ -181,6 +187,40 @@ public class ModificationUtils {
     return isPointDeleted(timestamp, deletionList, deleteCursor);
   }
 
+  /**
+   * Check whether the device with start time and end time is completely 
deleted by mods or not.
+   * There are some slight differences from that in {@link SettleSelectorImpl}.
+   */
+  public static boolean isDeviceDeletedByMods(
+      Collection<Modification> modifications, IDeviceID device, long 
startTime, long endTime)
+      throws IllegalPathException {
+    for (Modification modification : modifications) {
+      PartialPath path = modification.getPath();
+      if (path.include(new MeasurementPath(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
+          && ((Deletion) modification).getTimeRange().contains(startTime, 
endTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static boolean isTimeseriesDeletedByMods(
+      Collection<Modification> modifications,
+      IDeviceID device,
+      String timeseriesId,
+      long startTime,
+      long endTime)
+      throws IllegalPathException {
+    for (Modification modification : modifications) {
+      PartialPath path = modification.getPath();
+      if (path.include(new MeasurementPath(device, timeseriesId))
+          && ((Deletion) modification).getTimeRange().contains(startTime, 
endTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private static void doModifyChunkMetaData(Modification modification, 
IChunkMetadata metaData) {
     if (modification instanceof Deletion) {
       Deletion deletion = (Deletion) modification;

Reply via email to