This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 39b28d38aeb Load: Do not auto create series that are completely
deleted by mods (#13182)
39b28d38aeb is described below
commit 39b28d38aeb718c16e9057c801990ef1ff5be2f7
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Aug 27 20:41:53 2024 +0800
Load: Do not auto create series that are completely deleted by mods (#13182)
Considers modification file in the analyze stage of Load, and skip
time-series that are completely deleted by mods during auto creation.
The implementation uses the statistics in TsFileResource.timeIndex (if
present) and TimeseriesMetadata to judge if a device or time-series is
completely deleted.
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 1 -
.../pipe/it/autocreate/IoTDBPipeWithLoadIT.java | 138 +++++++++++++++++++++
.../plan/analyze/LoadTsFileAnalyzer.java | 89 +++++++++++++
.../apache/iotdb/db/utils/ModificationUtils.java | 40 ++++++
4 files changed, 267 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 115ba4bfc90..abd745aa26b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -761,7 +761,6 @@ public class IoTDBLoadTsFileIT {
generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL
/ 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL
/ 10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_0, 10);
- generator.generateDeletion(SchemaConfig.DEVICE_1, 10);
writtenPoint1 = generator.getTotalNumber();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
new file mode 100644
index 00000000000..45e40331b38
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeWithLoadIT extends AbstractPipeDualAutoIT {
+
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ // TODO: delete ratis configurations
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ // Disable sender compaction to test mods
+ .setEnableSeqSpaceCompaction(false)
+ .setEnableUnseqSpaceCompaction(false)
+ .setEnableCrossSpaceCompaction(false);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
+ // 10 min, assert that the operations will not time out
+ senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+ receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ /**
+ * Test that when the receiver loads data from TsFile, it will not load
timeseries that are
+ * completed deleted by mods.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReceiverNotLoadDeletedTimeseries() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ // Enable mods transfer
+ extractorAttributes.put("source.mods.enable", "true");
+
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ // Generate TsFile and mods on sender. There are 6 time-series in total.
+ // Time-series not affected by mods: d1.s1, d2.s1
+ // Time-series partially deleted by mods: d1.s2, d3.s1
+ // Time-series completely deleted by mods: d1.s3, d4.s1 (should not be
loaded by receiver)
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, s1, s2, s3) values (1, 1, 1, 1),
(3, 3, 3, 3)",
+ "insert into root.db.d2 (time, s1) values (1, 1), (3, 3)",
+ "insert into root.db.d3 (time, s1) values (1, 1), (3, 3)",
+ "insert into root.db.d4 (time, s1) values (1, 1), (3, 3)",
+ "flush",
+ "delete from root.db.d1.s2 where time <= 2",
+ "delete from root.db.d1.s3 where time >= 1 and time <= 3",
+ "delete from root.db.d3.** where time <= 2",
+ "delete from root.db.d4.** where time >= 1 and time <= 3",
+ "flush"))) {
+ return;
+ }
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton("4,"));
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 4e2a0240d35..34f99c64388 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -57,11 +57,16 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
@@ -88,6 +93,7 @@ import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -244,6 +250,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
tsFileResource.deserialize();
}
+
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
+
// check if the tsfile is empty
if (!timeseriesMetadataIterator.hasNext()) {
throw new LoadEmptyFileException(tsFile.getAbsolutePath());
@@ -309,6 +317,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.schemaCache = new LoadTsFileAnalyzeSchemaCache();
}
+ public void setCurrentModificationsAndTimeIndex(TsFileResource resource)
throws IOException {
+ schemaCache.setCurrentModificationsAndTimeIndex(resource);
+ }
+
public void autoCreateAndVerify(
TsFileSequenceReader reader,
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
@@ -317,7 +329,30 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
device2TimeseriesMetadataList.entrySet()) {
final IDeviceID device = entry.getKey();
+ try {
+ if (schemaCache.isDeviceDeletedByMods(device)) {
+ continue;
+ }
+ } catch (IllegalPathException e) {
+ LOGGER.warn(
+ "Failed to check if device {} is deleted by mods. Will see it as
not deleted.",
+ device,
+ e);
+ }
+
for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+ try {
+ if (schemaCache.isTimeseriesDeletedByMods(device,
timeseriesMetadata)) {
+ continue;
+ }
+ } catch (IllegalPathException e) {
+ LOGGER.warn(
+ "Failed to check if device {}, timeseries {} is deleted by
mods. Will see it as not deleted.",
+ device,
+ timeseriesMetadata.getMeasurementId(),
+ e);
+ }
+
final TSDataType dataType = timeseriesMetadata.getTsDataType();
if (TSDataType.VECTOR.equals(dataType)) {
schemaCache
@@ -681,9 +716,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
private Set<PartialPath> alreadySetDatabases;
+ private Collection<Modification> currentModifications;
+ private ITimeIndex currentTimeIndex;
+
private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
private long tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0;
private long alreadySetDatabasesMemoryUsageSizeInBytes = 0;
+ private long currentModificationsMemoryUsageSizeInBytes = 0;
+ private long currentTimeIndexMemoryUsageSizeInBytes = 0;
private int currentBatchTimeSeriesCount = 0;
@@ -694,6 +734,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
this.tsFileDevice2IsAligned = new HashMap<>();
this.alreadySetDatabases = new HashSet<>();
+ this.currentModifications = new ArrayList<>();
}
public Map<IDeviceID, Set<MeasurementSchema>> getDevice2TimeSeries() {
@@ -749,6 +790,44 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
}
+ public void setCurrentModificationsAndTimeIndex(TsFileResource resource)
throws IOException {
+ clearModificationsAndTimeIndex();
+
+ currentModifications = resource.getModFile().getModifications();
+ for (final Modification modification : currentModifications) {
+ currentModificationsMemoryUsageSizeInBytes += ((Deletion)
modification).getSerializedSize();
+ }
+ block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
+
+ if (resource.resourceFileExists()) {
+ currentTimeIndex = resource.getTimeIndex();
+ if (currentTimeIndex instanceof FileTimeIndex) {
+ currentTimeIndex = resource.buildDeviceTimeIndex();
+ }
+ currentTimeIndexMemoryUsageSizeInBytes =
currentTimeIndex.calculateRamSize();
+ block.addMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
+ }
+ }
+
+ public boolean isDeviceDeletedByMods(IDeviceID device) throws
IllegalPathException {
+ return currentTimeIndex != null
+ && ModificationUtils.isDeviceDeletedByMods(
+ currentModifications,
+ device,
+ currentTimeIndex.getStartTime(device),
+ currentTimeIndex.getEndTime(device));
+ }
+
+ public boolean isTimeseriesDeletedByMods(
+ IDeviceID device, TimeseriesMetadata timeseriesMetadata) throws
IllegalPathException {
+ return ModificationUtils.isTimeseriesDeletedByMods(
+ currentModifications,
+ device,
+ timeseriesMetadata.getMeasurementId(),
+ timeseriesMetadata.getStatistics().getStartTime(),
+ timeseriesMetadata.getStatistics().getEndTime());
+ }
+
public void addAlreadySetDatabase(PartialPath database) {
long memoryUsageSizeInBytes = 0;
if (alreadySetDatabases.add(database)) {
@@ -778,6 +857,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
currentBatchTimeSeriesCount = 0;
}
+ public void clearModificationsAndTimeIndex() {
+ currentModifications.clear();
+ currentTimeIndex = null;
+ block.reduceMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
+ block.reduceMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
+ currentModificationsMemoryUsageSizeInBytes = 0;
+ currentTimeIndexMemoryUsageSizeInBytes = 0;
+ }
+
public void clearAlignedCache() {
tsFileDevice2IsAligned.clear();
block.reduceMemoryUsage(tsFileDevice2IsAlignedMemoryUsageSizeInBytes);
@@ -815,6 +903,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
public void close() {
clearTimeSeries();
+ clearModificationsAndTimeIndex();
clearAlignedCache();
clearDatabasesCache();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index 934549dd062..9e7b6f7ecb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -30,6 +35,7 @@ import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Pair;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
public class ModificationUtils {
@@ -181,6 +187,40 @@ public class ModificationUtils {
return isPointDeleted(timestamp, deletionList, deleteCursor);
}
+ /**
+ * Check whether the device with start time and end time is completely
deleted by mods or not.
+ * There are some slight differences from that in {@link SettleSelectorImpl}.
+ */
+ public static boolean isDeviceDeletedByMods(
+ Collection<Modification> modifications, IDeviceID device, long
startTime, long endTime)
+ throws IllegalPathException {
+ for (Modification modification : modifications) {
+ PartialPath path = modification.getPath();
+ if (path.include(new MeasurementPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
+ && ((Deletion) modification).getTimeRange().contains(startTime,
endTime)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean isTimeseriesDeletedByMods(
+ Collection<Modification> modifications,
+ IDeviceID device,
+ String timeseriesId,
+ long startTime,
+ long endTime)
+ throws IllegalPathException {
+ for (Modification modification : modifications) {
+ PartialPath path = modification.getPath();
+ if (path.include(new MeasurementPath(device, timeseriesId))
+ && ((Deletion) modification).getTimeRange().contains(startTime,
endTime)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static void doModifyChunkMetaData(Modification modification,
IChunkMetadata metaData) {
if (modification instanceof Deletion) {
Deletion deletion = (Deletion) modification;