This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d99ef5d3 [IOTDB-4829] Let NoMoreTsBlockEvent RPC is called in async 
way (#7911)
24d99ef5d3 is described below

commit 24d99ef5d3af1966c3e5d63aa9fd389a8543c4bb
Author: Liao Lanyu <[email protected]>
AuthorDate: Mon Nov 7 14:44:54 2022 +0800

    [IOTDB-4829] Let NoMoreTsBlockEvent RPC is called in async way (#7911)
---
 docs/UserGuide/Reference/Syntax-Conventions.md     | 38 +++++-----
 docs/zh/UserGuide/Reference/Syntax-Conventions.md  | 40 +++++------
 .../db/mpp/execution/exchange/SinkHandle.java      | 84 ++++++++++++----------
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 28 ++++++--
 4 files changed, 108 insertions(+), 82 deletions(-)

diff --git a/docs/UserGuide/Reference/Syntax-Conventions.md 
b/docs/UserGuide/Reference/Syntax-Conventions.md
index bf76c34195..bed70a2c88 100644
--- a/docs/UserGuide/Reference/Syntax-Conventions.md
+++ b/docs/UserGuide/Reference/Syntax-Conventions.md
@@ -20,11 +20,11 @@
 -->
 
 
-# Literal Values
+## Literal Values
 
 This section describes how to write literal values in IoTDB. These include 
strings, numbers, timestamp values, boolean values, and NULL.
 
-## String Literals
+### String Literals
 
 > We refer to MySQL's definition of string:A string is a sequence of bytes or 
 > characters, enclosed within either single quote (`'`) or double quote (`"`) 
 > characters.
 
@@ -111,7 +111,7 @@ Usages of string literals:
 - The key/value of an attribute can be String Literal and identifier, more 
details can be found at **key-value pair** part. 
 
 
-### How to use quotation marks in String Literals
+#### How to use quotation marks in String Literals
 
 There are several ways to include quote characters within a string:
 
@@ -132,7 +132,7 @@ The following examples demonstrate how quoting and escaping 
work:
 """string"  // "string
 ```
 
-## Numeric Literals
+### Numeric Literals
 
 Number literals include integer (exact-value) literals and floating-point 
(approximate-value) literals.
 
@@ -146,27 +146,27 @@ The `FLOAT` and `DOUBLE` data types are floating-point 
types and calculations ar
 
 An integer may be used in floating-point context; it is interpreted as the 
equivalent floating-point number.
 
-## Timestamp Literals
+### Timestamp Literals
 
 The timestamp is the time point at which data is produced. It includes 
absolute timestamps and relative timestamps in IoTDB. For information about 
timestamp support in IoTDB, see [Data Type Doc](../Data-Concept/Data-Type.md).
 
 Specially, `NOW()` represents a constant timestamp that indicates the system 
time at which the statement began to execute.
 
-## Boolean Literals
+### Boolean Literals
 
 The constants `TRUE` and `FALSE` evaluate to 1 and 0, respectively. The 
constant names can be written in any lettercase.
 
-## NULL Values
+### NULL Values
 
 The `NULL` value means “no data.” `NULL` can be written in any lettercase.
 
-# Identifiers
+## Identifiers
 
-## Usage scenarios
+### Usage scenarios
 
 Certain objects within IoTDB, including `TRIGGER`, `FUNCTION`(UDF), 
`CONTINUOUS QUERY`, `SCHEMA TEMPLATE`, `USER`, `ROLE`,`Pipe`,`PipeSink`,`alias` 
and other object names are known as identifiers.
 
-## Constraints
+### Constraints
 
 Below are basic constraints of identifiers, specific identifiers may have 
other constraints, for example, `user` should consists of more than 4 
characters. 
 
@@ -182,7 +182,7 @@ Below are basic constraints of identifiers, specific 
identifiers may have other
 - Identifier contains special characters.
 - Identifier that is a real number.
 
-## How to use quotations marks in quoted identifiers
+### How to use quotations marks in quoted identifiers
 
 `'` and `"` can be used directly in quoted identifiers.
 
@@ -198,7 +198,7 @@ create schema template `t1``t`
 (temperature FLOAT encoding=RLE, status BOOLEAN encoding=PLAIN 
compression=SNAPPY)
 ```
 
-## Examples
+### Examples
 
 Examples of case in which quoted identifier is used :
 
@@ -279,11 +279,11 @@ Examples of case in which quoted identifier is used :
 - The key/value of an attribute can be String Literal and identifier, more 
details can be found at **key-value pair** part. 
 
 
-# Node Names in Path
+## Node Names in Path
 
 Node name is a special identifier, it can also be wildcard `*` and `**`. When 
creating timeseries, node name can not be wildcard. In query statment, you can 
use wildcard to match one or more nodes of path.
 
-## Wildcard
+### Wildcard
 
 `*` represents one node. For example, `root.vehicle.*.sensor1` represents a 
4-node path which is prefixed with `root.vehicle` and suffixed with `sensor1`.
 
@@ -315,7 +315,7 @@ select a*b from root.sg
 |Time|root.sg.a * root.sg.b|
 ```
 
-## Identifier
+### Identifier
 
 When node name is not wildcard, it is a identifier, which means the 
constraints on it is the same as described in Identifier part.
 
@@ -378,7 +378,7 @@ Results:
 +-----------------------------+-----------+
 ```
 
-# Key-Value Pair
+## Key-Value Pair
 
 **The key/value of an attribute can be constant(including string) and 
identifier. **
 
@@ -477,13 +477,13 @@ CREATE PIPE my_pipe TO my_iotdb FROM
 (select ** from root WHERE time>=yyyy-mm-dd HH:MM:SS) WITH 'SyncDelOp' = 'true'
 ```
 
-# Keywords and Reserved Words
+## Keywords and Reserved Words
 
 Keywords are words that have significance in SQL. Keywords can be used as an 
identifier. Certain keywords, such as TIME/TIMESTAMP and ROOT, are reserved and 
cannot use as identifiers.
 
 [Keywords and Reserved Words](Keywords.md) shows the keywords and reserved 
words in IoTDB.
 
-# Session、TsFile API
+## Session、TsFile API
 
 When using the Session and TsFile APIs, if the method you call requires 
parameters such as measurement, device, storage group, path in the form of 
String, **please ensure that the parameters passed in the input string is the 
same as when using the SQL statement**, here are some examples to help you 
understand. Code example could be found at: 
`example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java`
 
@@ -582,7 +582,7 @@ String[] paths = new String[]{"root.sg.a", 
"root.sg.`a.``\"b`", "root.sg.`111`"}
 List<String> pathList = Arrays.asList(paths);
 ```
 
-# Detailed Definitions of Lexical and Grammar
+## Detailed Definitions of Lexical and Grammar
 
 Please read the lexical and grammar description files in our code repository:
 
diff --git a/docs/zh/UserGuide/Reference/Syntax-Conventions.md 
b/docs/zh/UserGuide/Reference/Syntax-Conventions.md
index 6a94023f52..a942ce8da5 100644
--- a/docs/zh/UserGuide/Reference/Syntax-Conventions.md
+++ b/docs/zh/UserGuide/Reference/Syntax-Conventions.md
@@ -20,11 +20,11 @@
 -->
 
 
-# 字面值常量
+## 字面值常量
 
 该部分对 IoTDB 中支持的字面值常量进行说明,包括字符串常量、数值型常量、时间戳常量、布尔型常量和空值。
 
-## 字符串常量
+### 字符串常量
 
 > 我们参照了 MySQL 对 字符串的定义:A string is a sequence of bytes or characters, enclosed 
 > within either single quote (`'`) or double quote (`"`) characters.
 
@@ -37,7 +37,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
 "another string"
 ```
 
-### 使用场景
+#### 使用场景
 
 - `INSERT` 或者 `SELECT` 中用于表达 `TEXT` 类型数据的场景。
 
@@ -108,7 +108,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
 
 - 用于表示键值对,键值对的键和值可以被定义成常量(包括字符串)或者标识符,具体请参考键值对章节。
 
-### 如何在字符串内使用引号
+#### 如何在字符串内使用引号
 
 - 在单引号引起的字符串内,双引号无需特殊处理。同理,在双引号引起的字符串内,单引号无需特殊处理。
 - 在单引号引起的字符串里,可以通过双写单引号来表示一个单引号,即单引号 ' 可以表示为 ''。
@@ -128,7 +128,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
 """string"  // "string
 ```
 
-## 数值型常量
+### 数值型常量
 
 数值型常量包括整型和浮点型。
 
@@ -140,27 +140,27 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
 
 在浮点上下文中可以使用整数,它会被解释为等效的浮点数。
 
-## 时间戳常量
+### 时间戳常量
 
 时间戳是一个数据到来的时间点,在 IoTDB 中分为绝对时间戳和相对时间戳。详细信息可参考 
[数据类型文档](https://iotdb.apache.org/zh/UserGuide/Master/Data-Concept/Data-Type.html)。
 
 特别地,`NOW()`表示语句开始执行时的服务端系统时间戳。
 
-## 布尔型常量
+### 布尔型常量
 
 布尔值常量 `TRUE` 和 `FALSE` 分别等价于 `1` 和 `0`,它们对大小写不敏感。
 
-## 空值
+### 空值
 
 `NULL`值表示没有数据。`NULL`对大小写不敏感。
 
-# 标识符
+## 标识符
 
-## 使用场景
+### 使用场景
 
 在 IoTDB 中,触发器名称、UDF函数名、元数据模板名称、用户与角色名、连续查询标识、Pipe、PipeSink、键值对中的键和值、别名等可以作为标识符。
 
-## 约束
+### 约束
 
 请注意,此处约束是标识符的通用约束,具体标识符可能还附带其它约束条件,如用户名限制字符数大于等于4,更严格的约束请参考具体标识符相关的说明文档。
 
@@ -181,7 +181,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
 - 标识符包含不允许的特殊字符。
 - 标识符为实数。
 
-## 如何在反引号引起的标识符中使用引号
+### 如何在反引号引起的标识符中使用引号
 
 **在反引号引起的标识符中可以直接使用单引号和双引号。**
 
@@ -197,7 +197,7 @@ create schema template `t1't"t`
 (temperature FLOAT encoding=RLE, status BOOLEAN encoding=PLAIN 
compression=SNAPPY)
 ```
 
-## 特殊情况示例
+### 特殊情况示例
 
 需要使用反引号进行引用的部分情况示例:
 
@@ -277,11 +277,11 @@ create schema template `t1't"t`
 - 用于表示键值对,键值对的键和值可以被定义成常量(包括字符串)或者标识符,具体请参考键值对章节。
 
 
-# 路径结点名
+## 路径结点名
 
 路径结点名是特殊的标识符,其还可以是通配符 \* 或 \*\*。在创建时间序列时,各层级的路径结点名不能为通配符 \* 或 
\*\*。在查询语句中,可以用通配符 \* 或 \*\* 来表示路径结点名,以匹配一层或多层路径。
 
-## 通配符
+### 通配符
 
 
`*`在路径中表示一层。例如`root.vehicle.*.sensor1`代表的是以`root.vehicle`为前缀,以`sensor1`为后缀,层次等于 
4 层的路径。
 
@@ -312,7 +312,7 @@ select a*b from root.sg
 |Time|root.sg.a * root.sg.b|
 ```
 
-## 标识符
+### 标识符
 
 路径结点名不为通配符时,使用方法和标识符一致。**在 SQL 中需要使用反引号引用的路径结点,在结果集中也会用反引号引起。**
 
@@ -377,7 +377,7 @@ select `111` from root.sg
 +-----------------------------+-------------+
 ```
 
-# 键值对
+## 键值对
 
 **键值对的键和值可以被定义为标识符或者常量。**
 
@@ -476,13 +476,13 @@ CREATE PIPE my_pipe TO my_iotdb FROM
 (select ** from root WHERE time>=yyyy-mm-dd HH:MM:SS) WITH 'SyncDelOp' = 'true'
 ```
 
-# 关键字和保留字
+## 关键字和保留字
 
 关键字是在 SQL 具有特定含义的词,可以作为标识符。保留字是关键字的一个子集,保留字不能用于标识符。
 
 关于 IoTDB 的关键字和保留字列表,可以查看 
[关键字和保留字](https://iotdb.apache.org/zh/UserGuide/Master/Reference/Keywords.html) 
。
 
-# Session、TsFile API
+## Session、TsFile API
 
 在使用Session、TsFIle 
API时,如果您调用的方法需要以字符串形式传入物理量(measurement)、设备(device)、存储组(storage 
group)、路径(path)等参数,**请保证所传入字符串与使用 SQL 
语句时的写法一致**,下面是一些帮助您理解的例子。具体代码示例可以参考:`example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java`
 
@@ -581,7 +581,7 @@ String[] paths = new String[]{"root.sg.a", 
"root.sg.`a.``\"b`", "root.sg.`111`"}
 List<String> pathList = Arrays.asList(paths);
 ```
 
-# 词法与文法详细定义
+## 词法与文法详细定义
 
 请阅读代码仓库中的词法和语法描述文件:
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index cdd891c66b..5c37286c78 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -170,48 +170,13 @@ public class SinkHandle implements ISinkHandle {
     throw new UnsupportedOperationException();
   }
 
-  private void sendEndOfDataBlockEvent() throws Exception {
-    logger.debug("[NotifyNoMoreTsBlock]");
-    int attempt = 0;
-    TEndOfDataBlockEvent endOfDataBlockEvent =
-        new TEndOfDataBlockEvent(
-            remoteFragmentInstanceId,
-            remotePlanNodeId,
-            localFragmentInstanceId,
-            nextSequenceId - 1);
-    while (attempt < MAX_ATTEMPT_TIMES) {
-      attempt += 1;
-      try (SyncDataNodeMPPDataExchangeServiceClient client =
-          mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
-        client.onEndOfDataBlockEvent(endOfDataBlockEvent);
-        break;
-      } catch (Throwable e) {
-        logger.error("Failed to send end of data block event, attempt times: 
{}", attempt, e);
-        if (attempt == MAX_ATTEMPT_TIMES) {
-          throw e;
-        }
-        Thread.sleep(retryIntervalInMs);
-      }
-    }
-  }
-
   @Override
   public synchronized void setNoMoreTsBlocks() {
     logger.debug("[StartSetNoMoreTsBlocks]");
     if (aborted || closed) {
       return;
     }
-    try {
-      sendEndOfDataBlockEvent();
-    } catch (Exception e) {
-      throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
-    }
-    noMoreTsBlocks = true;
-
-    if (isFinished()) {
-      sinkHandleListener.onFinish(this);
-    }
-    sinkHandleListener.onEndOfBlocks(this);
+    executorService.submit(new SendEndOfDataBlockEventTask());
   }
 
   @Override
@@ -413,4 +378,51 @@ public class SinkHandle implements ISinkHandle {
       }
     }
   }
+
+  /**
+   * Send a {@link org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent} to 
downstream fragment
+   * instance.
+   */
+  class SendEndOfDataBlockEventTask implements Runnable {
+
+    @Override
+    public void run() {
+      try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
+        logger.debug("[NotifyNoMoreTsBlock]");
+        int attempt = 0;
+        TEndOfDataBlockEvent endOfDataBlockEvent =
+            new TEndOfDataBlockEvent(
+                remoteFragmentInstanceId,
+                remotePlanNodeId,
+                localFragmentInstanceId,
+                nextSequenceId - 1);
+        while (attempt < MAX_ATTEMPT_TIMES) {
+          attempt += 1;
+          try (SyncDataNodeMPPDataExchangeServiceClient client =
+              
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
+            client.onEndOfDataBlockEvent(endOfDataBlockEvent);
+            break;
+          } catch (Throwable e) {
+            logger.error("Failed to send end of data block event, attempt 
times: {}", attempt, e);
+            if (attempt == MAX_ATTEMPT_TIMES) {
+              logger.error("Failed to send end of data block event after all 
retry", e);
+              sinkHandleListener.onFailure(SinkHandle.this, e);
+              return;
+            }
+            try {
+              Thread.sleep(retryIntervalInMs);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+              sinkHandleListener.onFailure(SinkHandle.this, e);
+            }
+          }
+        }
+        noMoreTsBlocks = true;
+        if (isFinished()) {
+          sinkHandleListener.onFinish(SinkHandle.this);
+        }
+        sinkHandleListener.onEndOfBlocks(SinkHandle.this);
+      }
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 411370c14a..5b194b81d7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -145,7 +145,13 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Mockito.verify(mockSinkHandleListener, 
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+    try {
+      Thread.sleep(500L);
+      Mockito.verify(mockSinkHandleListener, 
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
 
     // Ack tsblocks.
     sinkHandle.acknowledgeTsBlock(0, numOfMockTsBlock);
@@ -311,7 +317,14 @@ public class SinkHandleTest {
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Mockito.verify(mockSinkHandleListener, 
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+    try {
+      Thread.sleep(500L);
+      Mockito.verify(mockSinkHandleListener, 
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
     try {
       Thread.sleep(100L);
       Mockito.verify(mockClient, Mockito.times(1))
@@ -439,12 +452,13 @@ public class SinkHandleTest {
     // Close the SinkHandle.
     try {
       sinkHandle.setNoMoreTsBlocks();
-      Assert.fail("Expect an RuntimeException.");
-    } catch (RuntimeException e) {
-      Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
+      Assert.assertFalse(sinkHandle.isAborted());
+      Thread.sleep(500L);
+      Mockito.verify(mockSinkHandleListener, 
Mockito.times(0)).onEndOfBlocks(sinkHandle);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
     }
-    Assert.assertFalse(sinkHandle.isAborted());
-    Mockito.verify(mockSinkHandleListener, 
Mockito.times(0)).onEndOfBlocks(sinkHandle);
 
     // Abort the SinkHandle.
     sinkHandle.abort();

Reply via email to