This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch table_deletion_plus in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b9d305b2a66afdad45dcf1da0ea4d258f70171ab Author: Tian Jiang <[email protected]> AuthorDate: Wed Dec 4 19:04:14 2024 +0800 Supplement deletion-related tests and bug fixes --- .../relational/it/db/it/IoTDBDeletionTableIT.java | 327 ++++++++++++++++++--- .../execution/fragment/QueryContext.java | 18 +- .../db/queryengine/plan/analyze/AnalyzeUtils.java | 14 +- .../relational/analyzer/StatementAnalyzer.java | 3 - .../db/storageengine/dataregion/DataRegion.java | 32 +- .../execute/utils/CompactionPathUtils.java | 15 +- .../executor/fast/SeriesCompactionExecutor.java | 10 +- .../dataregion/memtable/TsFileProcessor.java | 10 +- .../modification/ModificationFileTest.java | 242 +++++++++++++-- 9 files changed, 573 insertions(+), 98 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java index fd0a23c97b1..7dc359f6cf5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java @@ -32,6 +32,8 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.FileWriter; @@ -43,6 +45,12 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -50,25 +58,24 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -@Ignore @RunWith(IoTDBTestRunner.class) @Category({TableLocalStandaloneIT.class, TableClusterIT.class}) public class IoTDBDeletionTableIT { - private static String[] creationSqls = + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDeletionTableIT.class); + private static final String[] creationSqls = new String[] { "CREATE DATABASE IF NOT EXISTS test", "USE test", "CREATE TABLE IF NOT EXISTS vehicle0(deviceId STRING ID, s0 INT32 MEASUREMENT, s1 INT64 MEASUREMENT, s2 FLOAT MEASUREMENT, s3 TEXT MEASUREMENT, s4 BOOLEAN MEASUREMENT)", }; - private String insertTemplate = + private final String insertTemplate = "INSERT INTO test.vehicle%d(time, deviceId, s0,s1,s2,s3,s4" + ") VALUES(%d,'d%d',%d,%d,%f,%s,%b)"; - private String deleteAllTemplate = "DROP TABLE IF EXISTS vehicle%d"; @BeforeClass - public static void setUp() throws Exception { + public static void setUp() { Locale.setDefault(Locale.ENGLISH); EnvFactory.getEnv() @@ -82,22 +89,18 @@ public class IoTDBDeletionTableIT { } @AfterClass - public static void tearDown() throws Exception { + public static void tearDown() { EnvFactory.getEnv().cleanClusterEnvironment(); } - /** - * Should delete this case after the deletion value filter feature be implemented - * - * @throws SQLException - */ + /** Should delete this case after the deletion value filter feature be implemented */ @Test public void testUnsupportedValueFilter() throws SQLException { try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { statement.execute("use test"); statement.execute( - "CREATE TABLE vehicle1(deviceId STRING ID, s0 INT32 MEASUREMENT, s1 INT64 MEASUREMENT, s2 FLOAT MEASUREMENT, s3 TEXT MEASUREMENT, s4 BOOLEAN MEASUREMENT)"); + "CREATE TABLE vehicle1(deviceId STRING ID, s0 INT32 MEASUREMENT, s1 INT64 MEASUREMENT, s2 FLOAT MEASUREMENT, s3 TEXT MEASUREMENT, s4 BOOLEAN MEASUREMENT, attr1 ATTRIBUTE)"); statement.execute("insert into vehicle1(time, deviceId, s0) values (10, 'd0', 310)"); statement.execute("insert into vehicle1(time, deviceId, s3) values (10, 'd0','text')"); @@ -110,6 +113,21 @@ public class IoTDBDeletionTableIT { assertEquals("701: The column 's0' does not exist or is not an id column", e.getMessage()); } + try { + statement.execute("DELETE FROM vehicle1 WHERE s1 = 'text'"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals("701: The column 's1' does not exist or is not an id column", e.getMessage()); + } + + try { + statement.execute("DELETE FROM vehicle1 WHERE attr1 = 'text'"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals( + "701: The column 'attr1' does not exist or is not an id column", e.getMessage()); + } + try { statement.execute("DELETE FROM vehicle1 WHERE s3 = 'text'"); fail("should not reach here!"); @@ -149,6 +167,20 @@ public class IoTDBDeletionTableIT { e.getMessage()); } + try { + statement.execute("DELETE FROM vehicle1 WHERE true"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals("701: Unsupported expression: true in true", e.getMessage()); + } + + try { + statement.execute("DELETE FROM vehicleNonExist"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals("701: Table vehiclenonexist not found", e.getMessage()); + } + try (ResultSet set = statement.executeQuery("SELECT s0 FROM vehicle1")) { int cnt = 0; while (set.next()) { @@ -237,8 +269,7 @@ public class IoTDBDeletionTableIT { statement.execute("CREATE DATABASE ln3"); statement.execute("use ln3"); statement.execute( - String.format( - "CREATE TABLE vehicle3(deviceId STRING ID, s0 INT32 MEASUREMENT, s1 INT64 MEASUREMENT, s2 FLOAT MEASUREMENT, s3 TEXT MEASUREMENT, s4 BOOLEAN MEASUREMENT)")); + "CREATE TABLE vehicle3(deviceId STRING ID, s0 INT32 MEASUREMENT, s1 INT64 MEASUREMENT, s2 FLOAT MEASUREMENT, s3 TEXT MEASUREMENT, s4 BOOLEAN MEASUREMENT)"); statement.execute( "INSERT INTO vehicle3(time, deviceId, s4) " + "values(1509465600000, 'd0', true)"); @@ -310,7 +341,6 @@ public class IoTDBDeletionTableIT { } cleanData(5); } catch (Exception e) { - e.printStackTrace(); fail(e.getMessage()); } } @@ -346,7 +376,7 @@ public class IoTDBDeletionTableIT { } @Test - public void testDelFlushingMemtable() throws SQLException { + public void testDelFlushingMemTable() throws SQLException { int testNum = 7; int deviceId = 0; try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); @@ -376,7 +406,7 @@ public class IoTDBDeletionTableIT { } @Test - public void testDelMultipleFlushingMemtable() throws SQLException { + public void testDelMultipleFlushingMemTable() throws SQLException { int testNum = 8; int deviceId = 0; try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); @@ -530,7 +560,7 @@ public class IoTDBDeletionTableIT { statement.execute("DROP TABLE vehicle" + testNum); - try (ResultSet set = statement.executeQuery("SELECT * FROM vehicle" + testNum)) { + try (ResultSet ignored = statement.executeQuery("SELECT * FROM vehicle" + testNum)) { fail("Exception expected"); } catch (SQLException e) { assertEquals("701: Table 'test.vehicle12' does not exist", e.getMessage()); @@ -561,33 +591,58 @@ public class IoTDBDeletionTableIT { } @Test - public void testMultiDevice() throws SQLException { + public void testSingleDeviceDeletionMultiExecution() throws SQLException { int testNum = 13; - prepareData(testNum, 2); + prepareData(testNum, 5); try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { statement.execute("use test"); - // init d0[1, 400] d1[1, 400] + // init d0[1, 400] d1[1, 400] d2[1, 400] d3[1, 400] d4[1, 400] - // remain d1[10, 400] + // remain d1[10, 400] d2[10, 400] d3[10, 400] d4[10, 400] statement.execute("DELETE FROM vehicle" + testNum + " WHERE time < 10 or deviceId = 'd0'"); + int[] expectedPointNumOfDevice = new int[] {0, 391, 391, 391, 391}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d1[50, 400] d2[10, 400] d3[10, 400] d4[10, 400] + statement.execute("DELETE FROM vehicle" + testNum + " WHERE time < 50 and deviceId = 'd1'"); + expectedPointNumOfDevice = new int[] {0, 351, 391, 391, 391}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d1[50, 400] d2[101, 400] d3[10, 400] d4[10, 400] + statement.execute( + "DELETE FROM vehicle" + testNum + " WHERE time <= 100 and deviceId = 'd2'"); + expectedPointNumOfDevice = new int[] {0, 351, 300, 391, 391}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d1[50, 400] d2[101, 400] d3[301, 400] d4[10, 400] + statement.execute( + "DELETE FROM vehicle" + testNum + " WHERE time <= 300 and deviceId = 'd3'"); + expectedPointNumOfDevice = new int[] {0, 351, 300, 100, 391}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d1[50, 400] d2[101, 400] d3[301, 400] d4[10, 100] + statement.execute("DELETE FROM vehicle" + testNum + " WHERE time > 100 and deviceId = 'd4'"); + expectedPointNumOfDevice = new int[] {0, 351, 300, 100, 91}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + } + cleanData(testNum); + } + private void checkDevicePoint(int[] expectedPointNumOfDevice, Statement statement, int testNum) + throws SQLException { + for (int i = 0; i < expectedPointNumOfDevice.length; i++) { try (ResultSet set = - statement.executeQuery("SELECT * FROM vehicle" + testNum + " where deviceId = 'd1'")) { + statement.executeQuery( + "SELECT * FROM vehicle" + testNum + " where deviceId = 'd" + i + "'")) { int cnt = 0; while (set.next()) { cnt++; } - assertEquals(391, cnt); - } - - try (ResultSet set = - statement.executeQuery("SELECT * FROM vehicle" + testNum + " where deviceId = 'd0'")) { - assertFalse(set.next()); + assertEquals(expectedPointNumOfDevice[i], cnt); } } - cleanData(testNum); } @Test @@ -657,19 +712,109 @@ public class IoTDBDeletionTableIT { } @Test - public void testIllegalRange() { - int testNum = 15; + public void testIllegalRange() throws SQLException { + int testNum = 16; try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { statement.execute("use test"); statement.execute( "create table t" + testNum + " (id1 string id, id2 string id, s1 int32 measurement)"); - statement.execute("delete from t" + testNum + " where time > 10 and time <= 1"); - fail("Exception expected"); - } catch (SQLException e) { - assertEquals("701: Start time 11 is greater than end time 1", e.getMessage()); + try { + statement.execute("delete from t" + testNum + " where time > 10 and time <= 1"); + fail("Exception expected"); + } catch (SQLException e) { + assertEquals("701: Start time 11 is greater than end time 1", e.getMessage()); + } + } + } + + @Test + public void testMultiDevicePartialDeletionMultiExecution() throws SQLException { + int testNum = 17; + prepareData(testNum, 5); + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + + // init d0[1, 400] d1[1, 400] d2[1, 400] d3[1, 400] d4[1, 400] + + // remain d0[10, 400] d1[10, 400] d2[1, 400] d3[1, 400] d4[1, 400] + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE time < 10 and (deviceId = 'd0' or deviceId = 'd1')"); + int[] expectedPointNumOfDevice = new int[] {391, 391, 400, 400, 400}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d0[10, 400] d1[50, 400] d2[50, 400] d3[50, 400] d4[1, 400] + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE time < 50 and (deviceId = 'd1' or deviceId = 'd2' or deviceId = 'd3')"); + expectedPointNumOfDevice = new int[] {391, 351, 351, 351, 400}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d0[101, 400] d1[50, 400] d2[101, 400] d3[101, 400] d4[101, 400] + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE time <= 100 and (deviceId = 'd2' or deviceId = 'd3' or deviceId = 'd4' or deviceId = 'd0')"); + expectedPointNumOfDevice = new int[] {300, 351, 300, 300, 300}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d0[101, 150] d1[50, 150] d2[101, 150] d3[101, 150] d4[101, 150] + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE time > 150 and (deviceId = 'd2' or deviceId = 'd3' or deviceId = 'd4' or deviceId = 'd0' or deviceId = 'd1')"); + expectedPointNumOfDevice = new int[] {50, 101, 50, 50, 50}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + } + cleanData(testNum); + } + + @Test + public void testMultiDeviceFullDeletionMultiExecution() throws SQLException { + int testNum = 18; + prepareData(testNum, 5); + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + + // init d0[1, 400] d1[1, 400] d2[1, 400] d3[1, 400] d4[1, 400] + + // remain d2[1, 400] d3[1, 400] d4[1, 400] + statement.execute( + "DELETE FROM vehicle" + testNum + " WHERE (deviceId = 'd0' or deviceId = 'd1')"); + int[] expectedPointNumOfDevice = new int[] {0, 0, 400, 400, 400}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain d4[1, 400] + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE (deviceId = 'd1' or deviceId = 'd2' or deviceId = 'd3')"); + expectedPointNumOfDevice = new int[] {0, 0, 0, 0, 400}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + // remain nothing + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE (deviceId = 'd2' or deviceId = 'd3' or deviceId = 'd4' or deviceId = 'd0')"); + expectedPointNumOfDevice = new int[] {0, 0, 0, 0, 0}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); + + /// remain nothing + statement.execute( + "DELETE FROM vehicle" + + testNum + + " WHERE (deviceId = 'd2' or deviceId = 'd3' or deviceId = 'd4' or deviceId = 'd0' or deviceId = 'd1')"); + expectedPointNumOfDevice = new int[] {0, 0, 0, 0, 0}; + checkDevicePoint(expectedPointNumOfDevice, statement, testNum); } + cleanData(testNum); } @Test @@ -715,7 +860,110 @@ public class IoTDBDeletionTableIT { } } - @Ignore + @Ignore("long test") + @Test + public void testConcurrentFlushAndDeletion() throws InterruptedException, ExecutionException { + // try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + // Statement statement = connection.createStatement()) { + // statement.execute("SET CONFIGURATION enable_seq_space_compaction='false'"); + // } catch (SQLException e) { + // throw new RuntimeException(e); + // } + + AtomicLong writtenPointCounter = new AtomicLong(); + ExecutorService threadPool = Executors.newCachedThreadPool(); + Future<Void> writeThread = + threadPool.submit(() -> concurrentWrite(writtenPointCounter, threadPool)); + Future<Void> deletionThread = + threadPool.submit(() -> concurrentDeletion(writtenPointCounter, threadPool)); + writeThread.get(); + deletionThread.get(); + threadPool.shutdown(); + boolean success = threadPool.awaitTermination(1, TimeUnit.MINUTES); + assertTrue(success); + } + + private Void concurrentWrite(AtomicLong writtenPointCounter, ExecutorService allThreads) + throws SQLException { + int fileNumMax = 10000; + int pointPerFile = 100; + + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + + statement.execute("create database if not exists test"); + statement.execute("use test"); + + statement.execute("create table table1(deviceId STRING ID, s0 INT32 MEASUREMENT)"); + + for (int i = 1; i <= fileNumMax; i++) { + for (int j = 0; j < pointPerFile; j++) { + statement.execute( + String.format( + "INSERT INTO test.table1(time, deviceId, s0) VALUES(%d,'d0',%d)", + writtenPointCounter.get(), writtenPointCounter.get())); + writtenPointCounter.incrementAndGet(); + if (Thread.interrupted()) { + return null; + } + } + statement.execute("FLUSH"); + } + } catch (Throwable e) { + allThreads.shutdownNow(); + throw e; + } + return null; + } + + private Void concurrentDeletion(AtomicLong writtenPointCounter, ExecutorService allThreads) + throws SQLException, InterruptedException { + // delete every 10 points in 100 points + int deletionOffset = 0; + int deletionInterval = 100; + int deletionRange = 10; + long nextPointNumToDelete = deletionInterval; + // pointPerFile * fileNumMax + long deletionEnd = 100 * 10000; + + long deletedCnt = 0; + + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + + statement.execute("create database if not exists test"); + statement.execute("use test"); + while (deletionOffset < deletionEnd && !Thread.interrupted()) { + if (writtenPointCounter.get() >= nextPointNumToDelete) { + statement.execute( + "delete from test.table1 where time >= " + + deletionOffset + + " and time < " + + (deletionOffset + deletionRange)); + deletedCnt += deletionRange; + LOGGER.info("{} points deleted", deletedCnt); + + try (ResultSet set = + statement.executeQuery( + "select count(*) from table1 where time < " + nextPointNumToDelete)) { + assertTrue(set.next()); + assertEquals(nextPointNumToDelete * 9 / 10, set.getLong(1)); + } + deletionOffset += deletionInterval; + nextPointNumToDelete += deletionInterval; + + } else { + Thread.sleep(10); + } + } + } catch (Throwable e) { + allThreads.shutdownNow(); + throw e; + } + return null; + } + + @Ignore("performance") @Test public void testDeletionWritePerformance() throws SQLException, IOException { int fileNumMax = 10000; @@ -765,7 +1013,7 @@ public class IoTDBDeletionTableIT { } } - @Ignore + @Ignore("performance") @Test public void testDeletionReadPerformance() throws SQLException, IOException { int fileNumMax = 100; @@ -838,7 +1086,7 @@ public class IoTDBDeletionTableIT { statement.execute(sql); } } catch (Exception e) { - e.printStackTrace(); + fail(e.getMessage()); } } @@ -893,6 +1141,7 @@ public class IoTDBDeletionTableIT { try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { statement.execute("use test"); + String deleteAllTemplate = "DROP TABLE IF EXISTS vehicle%d"; statement.execute(String.format(deleteAllTemplate, testNum)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index 566e998d2c4..4a296565e47 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java @@ -119,8 +119,7 @@ public class QueryContext { } List<ModEntry> modEntries = - ModificationUtils.sortAndMerge( - getAllModifications(tsFileResource).getOverlapped(deviceID, measurement)); + getAllModifications(tsFileResource).getOverlapped(deviceID, measurement); if (deviceID.isTableModel()) { // the pattern tree has false-positive for table model deletion, so we do a further // filtering @@ -129,6 +128,8 @@ public class QueryContext { .filter(mod -> mod.affects(deviceID) && mod.affects(measurement)) .collect(Collectors.toList()); } + modEntries = ModificationUtils.sortAndMerge(modEntries); + return modEntries; } @@ -138,9 +139,16 @@ public class QueryContext { if (!checkIfModificationExists(tsFileResource)) { return Collections.emptyList(); } - - return ModificationUtils.sortAndMerge( - getAllModifications(tsFileResource).getDeviceOverlapped(new PartialPath(deviceID))); + List<ModEntry> modEntries = + getAllModifications(tsFileResource).getOverlapped(new PartialPath(deviceID)); + if (deviceID.isTableModel()) { + // the pattern tree has false-positive for table model deletion, so we do a further + // filtering + modEntries = + modEntries.stream().filter(mod -> mod.affects(deviceID)).collect(Collectors.toList()); + } + modEntries = ModificationUtils.sortAndMerge(modEntries); + return modEntries; } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java index b997e1e886c..c40509232b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java @@ -411,14 +411,14 @@ public class AnalyzeUtils { List<Expression> leftList, List<Expression> rightList, Operator operator) { List<Expression> results = new ArrayList<>(); for (Expression leftExp : leftList) { - List<Expression> terms = new ArrayList<>(); - if (leftExp instanceof LogicalExpression) { - terms.addAll(((LogicalExpression) leftExp).getTerms()); - } else { - terms.add(leftExp); - } - for (Expression rightExp : rightList) { + List<Expression> terms = new ArrayList<>(); + if (leftExp instanceof LogicalExpression) { + terms.addAll(((LogicalExpression) leftExp).getTerms()); + } else { + terms.add(leftExp); + } + if (rightExp instanceof LogicalExpression) { terms.addAll(((LogicalExpression) rightExp).getTerms()); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index 928354dee10..d4c32235a75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -533,9 +533,6 @@ public class StatementAnalyzer { @Override protected Scope visitDelete(Delete node, Optional<Scope> scope) { - if (true) { - throw new SemanticException("Delete statement is not supported yet."); - } final Scope ret = Scope.create(); AnalyzeUtils.analyzeDelete(node, queryContext); analysis.setScope(node, ret); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d583eafd9d9..167eff855b8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2252,6 +2252,8 @@ public class DataRegion implements IDataRegionForQuery { } List<TableDeletionEntry> modEntries = node.getModEntries(); + logger.info("[Deletion] Executing table deletion {}", node); + writeLock("delete"); boolean hasReleasedLock = false; try { @@ -2275,7 +2277,9 @@ public class DataRegion implements IDataRegionForQuery { unsealedTsFileResource, modEntry.getStartTime(), modEntry.getEndTime()); + logger.info("[Deletion] unsealed files for {}: {}", unsealedTsFileResource, modEntry); deleteDataInUnsealedFiles(unsealedTsFileResource, modEntry, sealedTsFileResource); + logger.info("[Deletion] sealed files for {}: {}", sealedTsFileResource, modEntry); sealedTsFileResourceLists.add(sealedTsFileResource); } @@ -2439,6 +2443,12 @@ public class DataRegion implements IDataRegionForQuery { if (!ModificationUtils.overlap( deletion.getStartTime(), deletion.getEndTime(), fileStartTime, fileEndTime)) { + logger.info( + "[Deletion] {} skipped {}, file time [{}, {}]", + deletion, + tsFileResource, + fileStartTime, + fileEndTime); return true; } ITimeIndex timeIndex = tsFileResource.getTimeIndex(); @@ -2456,6 +2466,7 @@ public class DataRegion implements IDataRegionForQuery { return false; } } + logger.info("[Deletion] {} skipped {}, file time {}", deletion, tsFileResource, timeIndex); return true; } @@ -2479,7 +2490,9 @@ public class DataRegion implements IDataRegionForQuery { tsFileResource.getProcessor().getFlushQueryLock().writeLock().unlock(); } else { try { - tsFileResource.getProcessor().deleteDataInMemory(deletion); + if (!tsFileResource.getProcessor().deleteDataInMemory(deletion)) { + sealedTsFiles.add(tsFileResource); + } // else do nothing } finally { tsFileResource.getProcessor().getFlushQueryLock().writeLock().unlock(); } @@ -2502,6 +2515,11 @@ public class DataRegion implements IDataRegionForQuery { involvedModificationFiles.add(sealedTsFile.getModFileForWrite()); } + if (involvedModificationFiles.isEmpty()) { + logger.info("[Deletion] Deletion {} does not involve any file", deletion); + return; + } + List<Exception> exceptions = involvedModificationFiles.parallelStream() .map( @@ -2526,6 +2544,10 @@ public class DataRegion implements IDataRegionForQuery { "Multiple errors occurred while writing mod files, see logs for details."); } } + logger.info( + "[Deletion] Deletion {} is written into {} mod files", + deletion, + involvedModificationFiles.size()); } private void deleteDataDirectlyInFile(List<TsFileResource> tsfileResourceList, ModEntry modEntry) @@ -2537,15 +2559,13 @@ public class DataRegion implements IDataRegionForQuery { // can be deleted by mods. Set<ModificationFile> involvedModificationFiles = new HashSet<>(); for (TsFileResource tsFileResource : deletedByMods) { - if (tsFileResource.isClosed()) { + if (tsFileResource.isClosed() + || !tsFileResource.getProcessor().deleteDataInMemory(modEntry)) { if (tsFileResource.isCompacting()) { involvedModificationFiles.add(tsFileResource.getCompactionModFile()); } involvedModificationFiles.add(tsFileResource.getModFileForWrite()); - } else { - // delete data in memory of unsealed file - tsFileResource.getProcessor().deleteDataInMemory(modEntry); - } + } // else do nothing } for (ModificationFile involvedModificationFile : involvedModificationFiles) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java index 2ef2c742dd7..711effdc3f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java @@ -21,9 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; -import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.file.metadata.IDeviceID; public class CompactionPathUtils { @@ -32,17 +30,6 @@ public class CompactionPathUtils { public static PartialPath getPath(IDeviceID device, String measurement) throws IllegalPathException { - return getPath(device).concatAsMeasurementPath(measurement); - } - - public static PartialPath getPath(IDeviceID device) throws IllegalPathException { - PartialPath path; - String plainDeviceId = device.toString(); - if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) { - path = DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId); - } else { - path = new PartialPath(plainDeviceId.split(TsFileConstant.PATH_SEPARATER_NO_REGEX)); - } - return path; + return new PartialPath(device).concatAsMeasurementPath(measurement); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index 0dc75857e04..83dab2e7690 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -50,6 +50,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.stream.Collectors; public abstract class SeriesCompactionExecutor { @@ -478,7 +479,14 @@ public abstract class SeriesCompactionExecutor { if (allModifications == null) { return Collections.emptyList(); } - return ModificationUtils.sortAndMerge(allModifications.getOverlapped(path)); + List<ModEntry> modEntries = allModifications.getOverlapped(path); + if (path.getIDeviceID().isTableModel()) { + modEntries = + modEntries.stream() + .filter(e -> e.affects(path.getIDeviceID()) && e.affects(path.getMeasurement())) + .collect(Collectors.toList()); + } + return ModificationUtils.sortAndMerge(modEntries); } @SuppressWarnings("squid:S3776") diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index f5c176521aa..eba4df95cc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1153,22 +1153,30 @@ public class TsFileProcessor { * <= 'timestamp' in the deletion. <br> * * <p>Delete data in both working MemTable and flushing MemTables. + * + * @return true if data in MemTable is successfully deleted, false otherwise */ - public void deleteDataInMemory(ModEntry deletion) { + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public boolean deleteDataInMemory(ModEntry deletion) { flushQueryLock.writeLock().lock(); logFlushQueryWriteLocked(); try { + boolean deleted = false; if (workMemTable != null) { long pointDeleted = workMemTable.delete(deletion); logger.info( "[Deletion] Deletion with {} in workMemTable, {} points deleted", deletion, pointDeleted); + deleted = true; } // Flushing memTables are immutable, only record this deletion in these memTables for read if (!flushingMemTables.isEmpty()) { + logger.info("[Deletion] Deletion with {} in flushingMemTable", deletion); modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast())); + deleted = true; } + return deleted; } finally { flushQueryLock.writeLock().unlock(); logFlushQueryWriteUnlocked(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java index 6a1f5097633..f12b4fd3556 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java @@ -21,8 +21,13 @@ package org.apache.iotdb.db.storageengine.dataregion.modification; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.SegmentExactMatch; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.read.common.TimeRange; import org.junit.Assert; import org.junit.Test; @@ -30,6 +35,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; @@ -46,22 +52,32 @@ public class ModificationFileTest { new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s1"}), 1), new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s2"}), 2), new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s3"}), 3, 4), - new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s41"}), 4, 5) + new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s41"}), 4, 5), + new TableDeletionEntry(new DeletionPredicate("table1", new NOP()), new TimeRange(1, 2)), + new TableDeletionEntry( + new DeletionPredicate("table2", new SegmentExactMatch("id11", 0)), + new TimeRange(3, 4)), + new TableDeletionEntry( + new DeletionPredicate( + "table3", + new FullExactMatch(Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), + new TimeRange(5, 6)), + new TableDeletionEntry(new DeletionPredicate("table4"), new TimeRange(7, 8)), }; try (ModificationFile mFile = new ModificationFile(tempFileName)) { - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 4; i++) { mFile.write(modifications[i]); } List<ModEntry> modificationList = mFile.getAllMods(); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 4; i++) { assertEquals(modifications[i], modificationList.get(i)); } - for (int i = 2; i < 4; i++) { + for (int i = 4; i < 8; i++) { mFile.write(modifications[i]); } modificationList = mFile.getAllMods(); - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 8; i++) { assertEquals(modifications[i], modificationList.get(i)); } } catch (IOException e) { @@ -77,14 +93,23 @@ public class ModificationFileTest { ModEntry[] modifications = new ModEntry[] { new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s1"}), 1), - new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s2"}), 2) + new TreeDeletionEntry(new MeasurementPath(new String[] {"d1", "s2"}), 2), + new TableDeletionEntry(new DeletionPredicate("table1", new NOP()), new TimeRange(1, 2)), + new TableDeletionEntry( + new DeletionPredicate("table2", new SegmentExactMatch("id11", 0)), + new TimeRange(3, 4)), + new TableDeletionEntry( + new DeletionPredicate( + "table3", + new FullExactMatch(Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), + new TimeRange(5, 6)), + new TableDeletionEntry(new DeletionPredicate("table4"), new TimeRange(7, 8)), }; try (ModificationFile mFile = new ModificationFile(tempFileName)) { - mFile.write(modifications[0]); - mFile.write(modifications[1]); + mFile.write(Arrays.asList(modifications)); List<ModEntry> modificationList = mFile.getAllMods(); - assertEquals(2, modificationList.size()); - for (int i = 0; i < 2; i++) { + assertEquals(modifications.length, modificationList.size()); + for (int i = 0; i < modifications.length; i++) { assertEquals(modifications[i], modificationList.get(i)); } } catch (IOException e) { @@ -199,9 +224,116 @@ public class ModificationFileTest { } } + @Test + public void testCompact05() { + String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods"); + long time = 1000; + try (ModificationFile modificationFile = new ModificationFile(tempFileName)) { + while (modificationFile.getFileLength() < 1024 * 1024) { + modificationFile.write( + new TableDeletionEntry( + new DeletionPredicate("table1", new NOP()), + new TimeRange(Long.MIN_VALUE, time += 5000))); + } + modificationFile.compact(); + List<ModEntry> modificationList = new ArrayList<>(modificationFile.getAllMods()); + assertEquals(1, modificationList.size()); + + ModEntry deletion = modificationList.get(0); + assertEquals(time, deletion.getEndTime()); + assertEquals(Long.MIN_VALUE, deletion.getStartTime()); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } + + @Test + public void testCompact06() { + String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods"); + long time = 1000; + try (ModificationFile modificationFile = new ModificationFile(tempFileName)) { + while (modificationFile.getFileLength() < 1024 * 100) { + modificationFile.write( + new TableDeletionEntry( + new DeletionPredicate("table1", new NOP()), + new TimeRange(Long.MIN_VALUE, time += 5000))); + } + modificationFile.compact(); + List<ModEntry> modificationList = new ArrayList<>(modificationFile.getAllMods()); + assertTrue(modificationList.size() > 1); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } + + // test if file size greater than 1M. + @Test + public void testCompact07() { + String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact03.mods"); + try (ModificationFile modificationFile = new ModificationFile(tempFileName)) { + while (modificationFile.getFileLength() < 1024 * 1024) { + modificationFile.write( + new TableDeletionEntry( + new DeletionPredicate("table1", new NOP()), + new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE))); + } + modificationFile.compact(); + List<ModEntry> modificationList = new ArrayList<>(modificationFile.getAllMods()); + assertEquals(1, modificationList.size()); + + ModEntry deletion = modificationList.get(0); + assertEquals(Long.MAX_VALUE, deletion.getEndTime()); + assertEquals(Long.MIN_VALUE, deletion.getStartTime()); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } + + @Test + public void testCompact08() { + String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact04.mods"); + try (ModificationFile modificationFile = new ModificationFile(tempFileName)) { + long time = 0; + while (modificationFile.getFileLength() < 1024 * 1024) { + for (int i = 0; i < 5; i++) { + ModEntry[] modEntries = { + new TableDeletionEntry( + new DeletionPredicate("table1", new NOP()), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate("table2", new SegmentExactMatch("id11", 0)), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate( + "table3", + new FullExactMatch( + Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate("table4"), new TimeRange(Long.MIN_VALUE, time += 5000)) + }; + modificationFile.write(Arrays.asList(modEntries)); + } + } + modificationFile.compact(); + List<ModEntry> modificationList = new ArrayList<>(modificationFile.getAllMods()); + assertEquals(4, modificationList.size()); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } + // test mods file and mods settle file both exists @Test - public void testRecover01() { + public void testRecover01() throws IOException { String modsFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods"); String modsSettleFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods.settle"); @@ -228,17 +360,13 @@ public class ModificationFileTest { } catch (IOException e) { throw new RuntimeException(e); } finally { - try { - Files.delete(new File(modsFileName).toPath()); - } catch (IOException e) { - throw new RuntimeException(e); - } + Files.delete(new File(modsFileName).toPath()); } } // test only mods settle file exists @Test - public void testRecover02() { + public void testRecover02() throws IOException { String modsSettleFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods.settle"); String originModsFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods"); try (ModificationFile modsSettleFile = new ModificationFile(modsSettleFileName)) { @@ -255,11 +383,81 @@ public class ModificationFileTest { } catch (IOException e) { throw new RuntimeException(e); } finally { - try { - Files.delete(new File(originModsFileName).toPath()); - } catch (IOException e) { - throw new RuntimeException(e); - } + Files.delete(new File(originModsFileName).toPath()); + } + } + + @Test + public void testRecover03() throws IOException { + String modsFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods"); + String modsSettleFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact01.mods.settle"); + + long time = 0; + try (ModificationFile modsFile = new ModificationFile(modsFileName); + ModificationFile modsSettleFile = new ModificationFile(modsSettleFileName)) { + + ModEntry[] modEntries = { + new TableDeletionEntry( + new DeletionPredicate("table1", new NOP()), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate("table2", new SegmentExactMatch("id11", 0)), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate( + "table3", + new FullExactMatch(Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate("table4"), new TimeRange(Long.MIN_VALUE, time += 5000)) + }; + modsFile.write(Arrays.asList(modEntries)); + + modsFile.close(); + modsSettleFile.close(); + new CompactionRecoverManager(null, null, null) + .recoverModSettleFile(new File(TestConstant.BASE_OUTPUT_PATH).toPath()); + Assert.assertTrue(modsFile.exists()); + Assert.assertFalse(modsSettleFile.getFileLength() > 0); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + Files.delete(new File(modsFileName).toPath()); + } + } + + @Test + public void testRecover04() throws IOException { + String modsSettleFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods.settle"); + String originModsFileName = TestConstant.BASE_OUTPUT_PATH.concat("compact02.mods"); + + long time = 0; + try (ModificationFile modsSettleFile = new ModificationFile(modsSettleFileName)) { + ModEntry[] modEntries = { + new TableDeletionEntry( + new DeletionPredicate("table1", new NOP()), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate("table2", new SegmentExactMatch("id11", 0)), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate( + "table3", + new FullExactMatch(Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), + new TimeRange(Long.MIN_VALUE, time += 5000)), + new TableDeletionEntry( + new DeletionPredicate("table4"), new TimeRange(Long.MIN_VALUE, time += 5000)) + }; + modsSettleFile.write(Arrays.asList(modEntries)); + modsSettleFile.close(); + new CompactionRecoverManager(null, null, null) + .recoverModSettleFile(new File(TestConstant.BASE_OUTPUT_PATH).toPath()); + Assert.assertFalse(modsSettleFile.getFileLength() > 0); + Assert.assertTrue(new File(originModsFileName).exists()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + Files.delete(new File(originModsFileName).toPath()); } } }
