This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch test-1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 96f32ae7380b861335cf82857c75f630b3cb2378 Author: HTHou <hao...@apache.org> AuthorDate: Fri Jun 20 11:16:48 2025 +0800 dev test-1 --- .../main/java/org/apache/iotdb/SessionExample.java | 110 ++++++++++----------- .../java/org/apache/iotdb/session/Session.java | 9 ++ .../apache/iotdb/session/SessionConnection.java | 6 ++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 13 +++ .../dataregion/memtable/TsFileProcessor.java | 14 +++ .../dataregion/modification/Deletion.java | 9 ++ .../thrift-datanode/src/main/thrift/client.thrift | 6 ++ 7 files changed, 110 insertions(+), 57 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 0daec0885bf..addf67bfc33 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -23,10 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.isession.SessionDataSet.DataIterator; import org.apache.iotdb.isession.template.Template; -import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; import org.apache.iotdb.session.template.MeasurementNode; @@ -65,63 +63,61 @@ public class SessionExample { private static Random random = new Random(); - public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException { + public static void main(String[] args) throws Exception { session = - new Session.Builder() - .host(LOCAL_HOST) - .port(6667) - .username("root") - .password("root") - .version(Version.V_1_0) - .build(); + new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build(); session.open(false); - - // set session fetchSize - session.setFetchSize(10000); - - try { - session.createDatabase("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - throw e; - } - } - - // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - insertRecord(); - insertTablet(); - // insertTabletWithNullValues(); - // insertTablets(); - // insertRecords(); - // insertText(); - // selectInto(); - // createAndDropContinuousQueries(); - // nonQuery(); - query(); - // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); - // queryByIterator(); - // deleteData(); - // deleteTimeseries(); - // setTimeout(); - - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); + session.executeNonQueryStatement( + "insert into root.sg.d1(time,s1, s2) values(1,1,1), (2,2,2), (3,3,3)"); + new Thread( + () -> { + try { + Session session2 = + new Session.Builder() + .host(LOCAL_HOST) + .port(6667) + .username("root") + .password("root") + .build(); + session2.open(false); + session2.executeNonQueryStatement("flush"); + System.out.println("====== After Flushed ======"); + session2.checkDeletionStatus(); + session2.close(); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .start(); + new Thread( + () -> { + try { + Thread.sleep(100); + Session session2 = + new Session.Builder() + .host(LOCAL_HOST) + .port(6667) + .username("root") + .password("root") + .build(); + session2.open(false); + session2.executeNonQueryStatement("delete from root.sg.d1.s1 where time < 2"); + System.out.println("====== Flushing ======"); + session2.checkDeletionStatus(); + session2.close(); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .start(); session.close(); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index b7bf5daac0c..a1806363b62 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -41,6 +41,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; +import org.apache.iotdb.service.rpc.thrift.TSDeletionStatusResp; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; @@ -3850,6 +3851,14 @@ public class Session implements ISession { return defaultSessionConnection.fetchAllConnections(); } + public void checkDeletionStatus() throws Exception { + TSDeletionStatusResp resp = defaultSessionConnection.getDeletionStatus(); + Map<String, String> map = resp.getDeletionStatus(); + for (Map.Entry<String, String> entry : map.entrySet()) { + System.out.println(entry.getKey() + " : " + entry.getValue()); + } + } + public static class Builder { private String host = SessionConfig.DEFAULT_HOST; private int rpcPort = SessionConfig.DEFAULT_PORT; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 425ae908e43..ab45b4cb18d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -41,6 +41,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; +import org.apache.iotdb.service.rpc.thrift.TSDeletionStatusResp; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; @@ -1031,6 +1032,11 @@ public class SessionConnection { return execResp; } + public TSDeletionStatusResp getDeletionStatus() + throws IoTDBConnectionException, StatementExecutionException, TException { + return client.checkDeletionStatus(); + } + private <T> RetryResult<T> callWithReconnect(TFunction<T> supplier) throws IoTDBConnectionException { T ret; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 3c710d0a82f..42b87f9652c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -104,6 +104,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSch import org.apache.iotdb.db.schemaengine.template.TemplateQueryType; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; +import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota; @@ -132,6 +134,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; +import org.apache.iotdb.service.rpc.thrift.TSDeletionStatusResp; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; @@ -2771,6 +2774,16 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + @Override + public TSDeletionStatusResp checkDeletionStatus() { + Map<String, String> resultMap = new HashMap<>(); + for (Map.Entry<Integer, Deletion> entry : + TsFileProcessor.flushingMemtableDeletionMap.entrySet()) { + resultMap.put(entry.getKey().toString(), entry.getValue().toString()); + } + return new TSDeletionStatusResp(resultMap); + } + private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) { TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS); resp.setColumnNameIndexMap(header.getColumnNameIndexMap()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index bd4c11e969b..b85e3d75b9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -104,11 +104,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -203,6 +205,10 @@ public class TsFileProcessor { public static final int MEMTABLE_NOT_EXIST = -1; + public static Map<Integer, Deletion> flushingMemtableDeletionMap = new ConcurrentHashMap<>(); + + private static final AtomicInteger idGenerator = new AtomicInteger(0); + @SuppressWarnings("squid:S107") public TsFileProcessor( String storageGroupName, @@ -1041,6 +1047,8 @@ public class TsFileProcessor { // Flushing memTables are immutable, only record this deletion in these memTables for read if (!flushingMemTables.isEmpty()) { modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast())); + deletion.setDeleteStatus(false); + flushingMemtableDeletionMap.put(idGenerator.getAndIncrement(), deletion); } } finally { flushQueryLock.writeLock().unlock(); @@ -1427,6 +1435,11 @@ public class TsFileProcessor { } } } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } try { flushQueryLock.writeLock().lock(); @@ -1437,6 +1450,7 @@ public class TsFileProcessor { entry.left.setFileOffset(tsFileResource.getTsFileSize()); this.tsFileResource.getModFile().write(entry.left); tsFileResource.getModFile().close(); + ((Deletion) (entry.left)).setDeleteStatus(true); iterator.remove(); logger.info( "[Deletion] Deletion with path: {}, time:{}-{} written when flush memtable", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java index 5ab469fde10..d46f1f00c9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java @@ -30,6 +30,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** Deletion is a delete operation on a timeseries. */ public class Deletion extends Modification implements Cloneable { @@ -37,6 +38,8 @@ public class Deletion extends Modification implements Cloneable { /** data within the interval [startTime, endTime] are to be deleted. */ private TimeRange timeRange; + private AtomicBoolean deleteStatus = new AtomicBoolean(true); + /** * constructor of Deletion, the start time is set to Long.MIN_VALUE * @@ -70,6 +73,10 @@ public class Deletion extends Modification implements Cloneable { } } + public void setDeleteStatus(boolean deleteStatus) { + this.deleteStatus.set(deleteStatus); + } + public long getStartTime() { return this.timeRange.getMin(); } @@ -157,6 +164,8 @@ public class Deletion extends Modification implements Cloneable { + path + ", fileOffset=" + fileOffset + + ", status=" + + (deleteStatus.get() ? "COMPLETED" : "PENDING") + '}'; } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 0fa1a9a6ecb..3e39828e0ad 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -529,8 +529,14 @@ struct TSConnectionInfoResp { 1: required list<TSConnectionInfo> connectionInfoList } +struct TSDeletionStatusResp { + 1: required map<string, string> deletionStatus +} + service IClientRPCService { + TSDeletionStatusResp checkDeletionStatus(); + TSExecuteStatementResp executeQueryStatementV2(1:TSExecuteStatementReq req); TSExecuteStatementResp executeUpdateStatementV2(1:TSExecuteStatementReq req);