This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch test-1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 96f32ae7380b861335cf82857c75f630b3cb2378
Author: HTHou <hao...@apache.org>
AuthorDate: Fri Jun 20 11:16:48 2025 +0800

    dev test-1
---
 .../main/java/org/apache/iotdb/SessionExample.java | 110 ++++++++++-----------
 .../java/org/apache/iotdb/session/Session.java     |   9 ++
 .../apache/iotdb/session/SessionConnection.java    |   6 ++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  13 +++
 .../dataregion/memtable/TsFileProcessor.java       |  14 +++
 .../dataregion/modification/Deletion.java          |   9 ++
 .../thrift-datanode/src/main/thrift/client.thrift  |   6 ++
 7 files changed, 110 insertions(+), 57 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 0daec0885bf..addf67bfc33 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -23,10 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.template.Template;
-import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.template.MeasurementNode;
 
@@ -65,63 +63,61 @@ public class SessionExample {
 
   private static Random random = new Random();
 
-  public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException {
+  public static void main(String[] args) throws Exception {
     session =
-        new Session.Builder()
-            .host(LOCAL_HOST)
-            .port(6667)
-            .username("root")
-            .password("root")
-            .version(Version.V_1_0)
-            .build();
+        new 
Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build();
     session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        throw e;
-      }
-    }
-
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
-    //    insertText();
-    //    selectInto();
-    //    createAndDropContinuousQueries();
-    //    nonQuery();
-    query();
-    //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    fastLastDataQueryForOneDevice();
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+    session.executeNonQueryStatement(
+        "insert into root.sg.d1(time,s1, s2) values(1,1,1), (2,2,2), (3,3,3)");
+    new Thread(
+            () -> {
+              try {
+                Session session2 =
+                    new Session.Builder()
+                        .host(LOCAL_HOST)
+                        .port(6667)
+                        .username("root")
+                        .password("root")
+                        .build();
+                session2.open(false);
+                session2.executeNonQueryStatement("flush");
+                System.out.println("====== After Flushed ======");
+                session2.checkDeletionStatus();
+                session2.close();
+              } catch (IoTDBConnectionException e) {
+                throw new RuntimeException(e);
+              } catch (StatementExecutionException e) {
+                throw new RuntimeException(e);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(100);
+                Session session2 =
+                    new Session.Builder()
+                        .host(LOCAL_HOST)
+                        .port(6667)
+                        .username("root")
+                        .password("root")
+                        .build();
+                session2.open(false);
+                session2.executeNonQueryStatement("delete from root.sg.d1.s1 
where time < 2");
+                System.out.println("====== Flushing ======");
+                session2.checkDeletionStatus();
+                session2.close();
+              } catch (IoTDBConnectionException e) {
+                throw new RuntimeException(e);
+              } catch (StatementExecutionException e) {
+                throw new RuntimeException(e);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
     session.close();
   }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index b7bf5daac0c..a1806363b62 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeletionStatusResp;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -3850,6 +3851,14 @@ public class Session implements ISession {
     return defaultSessionConnection.fetchAllConnections();
   }
 
+  public void checkDeletionStatus() throws Exception {
+    TSDeletionStatusResp resp = defaultSessionConnection.getDeletionStatus();
+    Map<String, String> map = resp.getDeletionStatus();
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      System.out.println(entry.getKey() + " : " + entry.getValue());
+    }
+  }
+
   public static class Builder {
     private String host = SessionConfig.DEFAULT_HOST;
     private int rpcPort = SessionConfig.DEFAULT_PORT;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 425ae908e43..ab45b4cb18d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeletionStatusResp;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
@@ -1031,6 +1032,11 @@ public class SessionConnection {
     return execResp;
   }
 
+  public TSDeletionStatusResp getDeletionStatus()
+      throws IoTDBConnectionException, StatementExecutionException, TException 
{
+    return client.checkDeletionStatus();
+  }
+
   private <T> RetryResult<T> callWithReconnect(TFunction<T> supplier)
       throws IoTDBConnectionException {
     T ret;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 3c710d0a82f..42b87f9652c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -104,6 +104,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSch
 import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
 import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota;
@@ -132,6 +134,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeletionStatusResp;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
@@ -2771,6 +2774,16 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  @Override
+  public TSDeletionStatusResp checkDeletionStatus() {
+    Map<String, String> resultMap = new HashMap<>();
+    for (Map.Entry<Integer, Deletion> entry :
+        TsFileProcessor.flushingMemtableDeletionMap.entrySet()) {
+      resultMap.put(entry.getKey().toString(), entry.getValue().toString());
+    }
+    return new TSDeletionStatusResp(resultMap);
+  }
+
   private TSExecuteStatementResp createResponse(DatasetHeader header, long 
queryId) {
     TSExecuteStatementResp resp = 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
     resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bd4c11e969b..b85e3d75b9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -104,11 +104,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -203,6 +205,10 @@ public class TsFileProcessor {
 
   public static final int MEMTABLE_NOT_EXIST = -1;
 
+  public static Map<Integer, Deletion> flushingMemtableDeletionMap = new 
ConcurrentHashMap<>();
+
+  private static final AtomicInteger idGenerator = new AtomicInteger(0);
+
   @SuppressWarnings("squid:S107")
   public TsFileProcessor(
       String storageGroupName,
@@ -1041,6 +1047,8 @@ public class TsFileProcessor {
       // Flushing memTables are immutable, only record this deletion in these 
memTables for read
       if (!flushingMemTables.isEmpty()) {
         modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
+        deletion.setDeleteStatus(false);
+        flushingMemtableDeletionMap.put(idGenerator.getAndIncrement(), 
deletion);
       }
     } finally {
       flushQueryLock.writeLock().unlock();
@@ -1427,6 +1435,11 @@ public class TsFileProcessor {
         }
       }
     }
+    try {
+      Thread.sleep(5000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
 
     try {
       flushQueryLock.writeLock().lock();
@@ -1437,6 +1450,7 @@ public class TsFileProcessor {
           entry.left.setFileOffset(tsFileResource.getTsFileSize());
           this.tsFileResource.getModFile().write(entry.left);
           tsFileResource.getModFile().close();
+          ((Deletion) (entry.left)).setDeleteStatus(true);
           iterator.remove();
           logger.info(
               "[Deletion] Deletion with path: {}, time:{}-{} written when 
flush memtable",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
index 5ab469fde10..d46f1f00c9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
@@ -30,6 +30,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** Deletion is a delete operation on a timeseries. */
 public class Deletion extends Modification implements Cloneable {
@@ -37,6 +38,8 @@ public class Deletion extends Modification implements 
Cloneable {
   /** data within the interval [startTime, endTime] are to be deleted. */
   private TimeRange timeRange;
 
+  private AtomicBoolean deleteStatus = new AtomicBoolean(true);
+
   /**
    * constructor of Deletion, the start time is set to Long.MIN_VALUE
    *
@@ -70,6 +73,10 @@ public class Deletion extends Modification implements 
Cloneable {
     }
   }
 
+  public void setDeleteStatus(boolean deleteStatus) {
+    this.deleteStatus.set(deleteStatus);
+  }
+
   public long getStartTime() {
     return this.timeRange.getMin();
   }
@@ -157,6 +164,8 @@ public class Deletion extends Modification implements 
Cloneable {
         + path
         + ", fileOffset="
         + fileOffset
+        + ", status="
+        + (deleteStatus.get() ? "COMPLETED" : "PENDING")
         + '}';
   }
 
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 0fa1a9a6ecb..3e39828e0ad 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -529,8 +529,14 @@ struct TSConnectionInfoResp {
   1: required list<TSConnectionInfo> connectionInfoList
 }
 
+struct TSDeletionStatusResp {
+  1: required map<string, string> deletionStatus
+}
+
 service IClientRPCService {
 
+  TSDeletionStatusResp checkDeletionStatus();
+
   TSExecuteStatementResp executeQueryStatementV2(1:TSExecuteStatementReq req);
 
   TSExecuteStatementResp executeUpdateStatementV2(1:TSExecuteStatementReq req);

Reply via email to