This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 24d99ef5d3 [IOTDB-4829] Let NoMoreTsBlockEvent RPC is called in async
way (#7911)
24d99ef5d3 is described below
commit 24d99ef5d3af1966c3e5d63aa9fd389a8543c4bb
Author: Liao Lanyu <[email protected]>
AuthorDate: Mon Nov 7 14:44:54 2022 +0800
[IOTDB-4829] Let NoMoreTsBlockEvent RPC is called in async way (#7911)
---
docs/UserGuide/Reference/Syntax-Conventions.md | 38 +++++-----
docs/zh/UserGuide/Reference/Syntax-Conventions.md | 40 +++++------
.../db/mpp/execution/exchange/SinkHandle.java | 84 ++++++++++++----------
.../db/mpp/execution/exchange/SinkHandleTest.java | 28 ++++++--
4 files changed, 108 insertions(+), 82 deletions(-)
diff --git a/docs/UserGuide/Reference/Syntax-Conventions.md
b/docs/UserGuide/Reference/Syntax-Conventions.md
index bf76c34195..bed70a2c88 100644
--- a/docs/UserGuide/Reference/Syntax-Conventions.md
+++ b/docs/UserGuide/Reference/Syntax-Conventions.md
@@ -20,11 +20,11 @@
-->
-# Literal Values
+## Literal Values
This section describes how to write literal values in IoTDB. These include
strings, numbers, timestamp values, boolean values, and NULL.
-## String Literals
+### String Literals
> We refer to MySQL's definition of string:A string is a sequence of bytes or
> characters, enclosed within either single quote (`'`) or double quote (`"`)
> characters.
@@ -111,7 +111,7 @@ Usages of string literals:
- The key/value of an attribute can be String Literal and identifier, more
details can be found at **key-value pair** part.
-### How to use quotation marks in String Literals
+#### How to use quotation marks in String Literals
There are several ways to include quote characters within a string:
@@ -132,7 +132,7 @@ The following examples demonstrate how quoting and escaping
work:
"""string" // "string
```
-## Numeric Literals
+### Numeric Literals
Number literals include integer (exact-value) literals and floating-point
(approximate-value) literals.
@@ -146,27 +146,27 @@ The `FLOAT` and `DOUBLE` data types are floating-point
types and calculations ar
An integer may be used in floating-point context; it is interpreted as the
equivalent floating-point number.
-## Timestamp Literals
+### Timestamp Literals
The timestamp is the time point at which data is produced. It includes
absolute timestamps and relative timestamps in IoTDB. For information about
timestamp support in IoTDB, see [Data Type Doc](../Data-Concept/Data-Type.md).
Specially, `NOW()` represents a constant timestamp that indicates the system
time at which the statement began to execute.
-## Boolean Literals
+### Boolean Literals
The constants `TRUE` and `FALSE` evaluate to 1 and 0, respectively. The
constant names can be written in any lettercase.
-## NULL Values
+### NULL Values
The `NULL` value means “no data.” `NULL` can be written in any lettercase.
-# Identifiers
+## Identifiers
-## Usage scenarios
+### Usage scenarios
Certain objects within IoTDB, including `TRIGGER`, `FUNCTION`(UDF),
`CONTINUOUS QUERY`, `SCHEMA TEMPLATE`, `USER`, `ROLE`,`Pipe`,`PipeSink`,`alias`
and other object names are known as identifiers.
-## Constraints
+### Constraints
Below are basic constraints of identifiers, specific identifiers may have
other constraints, for example, `user` should consists of more than 4
characters.
@@ -182,7 +182,7 @@ Below are basic constraints of identifiers, specific
identifiers may have other
- Identifier contains special characters.
- Identifier that is a real number.
-## How to use quotations marks in quoted identifiers
+### How to use quotations marks in quoted identifiers
`'` and `"` can be used directly in quoted identifiers.
@@ -198,7 +198,7 @@ create schema template `t1``t`
(temperature FLOAT encoding=RLE, status BOOLEAN encoding=PLAIN
compression=SNAPPY)
```
-## Examples
+### Examples
Examples of case in which quoted identifier is used :
@@ -279,11 +279,11 @@ Examples of case in which quoted identifier is used :
- The key/value of an attribute can be String Literal and identifier, more
details can be found at **key-value pair** part.
-# Node Names in Path
+## Node Names in Path
Node name is a special identifier, it can also be wildcard `*` and `**`. When
creating timeseries, node name can not be wildcard. In query statment, you can
use wildcard to match one or more nodes of path.
-## Wildcard
+### Wildcard
`*` represents one node. For example, `root.vehicle.*.sensor1` represents a
4-node path which is prefixed with `root.vehicle` and suffixed with `sensor1`.
@@ -315,7 +315,7 @@ select a*b from root.sg
|Time|root.sg.a * root.sg.b|
```
-## Identifier
+### Identifier
When node name is not wildcard, it is a identifier, which means the
constraints on it is the same as described in Identifier part.
@@ -378,7 +378,7 @@ Results:
+-----------------------------+-----------+
```
-# Key-Value Pair
+## Key-Value Pair
**The key/value of an attribute can be constant(including string) and
identifier. **
@@ -477,13 +477,13 @@ CREATE PIPE my_pipe TO my_iotdb FROM
(select ** from root WHERE time>=yyyy-mm-dd HH:MM:SS) WITH 'SyncDelOp' = 'true'
```
-# Keywords and Reserved Words
+## Keywords and Reserved Words
Keywords are words that have significance in SQL. Keywords can be used as an
identifier. Certain keywords, such as TIME/TIMESTAMP and ROOT, are reserved and
cannot use as identifiers.
[Keywords and Reserved Words](Keywords.md) shows the keywords and reserved
words in IoTDB.
-# Session、TsFile API
+## Session、TsFile API
When using the Session and TsFile APIs, if the method you call requires
parameters such as measurement, device, storage group, path in the form of
String, **please ensure that the parameters passed in the input string is the
same as when using the SQL statement**, here are some examples to help you
understand. Code example could be found at:
`example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java`
@@ -582,7 +582,7 @@ String[] paths = new String[]{"root.sg.a",
"root.sg.`a.``\"b`", "root.sg.`111`"}
List<String> pathList = Arrays.asList(paths);
```
-# Detailed Definitions of Lexical and Grammar
+## Detailed Definitions of Lexical and Grammar
Please read the lexical and grammar description files in our code repository:
diff --git a/docs/zh/UserGuide/Reference/Syntax-Conventions.md
b/docs/zh/UserGuide/Reference/Syntax-Conventions.md
index 6a94023f52..a942ce8da5 100644
--- a/docs/zh/UserGuide/Reference/Syntax-Conventions.md
+++ b/docs/zh/UserGuide/Reference/Syntax-Conventions.md
@@ -20,11 +20,11 @@
-->
-# 字面值常量
+## 字面值常量
该部分对 IoTDB 中支持的字面值常量进行说明,包括字符串常量、数值型常量、时间戳常量、布尔型常量和空值。
-## 字符串常量
+### 字符串常量
> 我们参照了 MySQL 对 字符串的定义:A string is a sequence of bytes or characters, enclosed
> within either single quote (`'`) or double quote (`"`) characters.
@@ -37,7 +37,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
"another string"
```
-### 使用场景
+#### 使用场景
- `INSERT` 或者 `SELECT` 中用于表达 `TEXT` 类型数据的场景。
@@ -108,7 +108,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
- 用于表示键值对,键值对的键和值可以被定义成常量(包括字符串)或者标识符,具体请参考键值对章节。
-### 如何在字符串内使用引号
+#### 如何在字符串内使用引号
- 在单引号引起的字符串内,双引号无需特殊处理。同理,在双引号引起的字符串内,单引号无需特殊处理。
- 在单引号引起的字符串里,可以通过双写单引号来表示一个单引号,即单引号 ' 可以表示为 ''。
@@ -128,7 +128,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
"""string" // "string
```
-## 数值型常量
+### 数值型常量
数值型常量包括整型和浮点型。
@@ -140,27 +140,27 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
在浮点上下文中可以使用整数,它会被解释为等效的浮点数。
-## 时间戳常量
+### 时间戳常量
时间戳是一个数据到来的时间点,在 IoTDB 中分为绝对时间戳和相对时间戳。详细信息可参考
[数据类型文档](https://iotdb.apache.org/zh/UserGuide/Master/Data-Concept/Data-Type.html)。
特别地,`NOW()`表示语句开始执行时的服务端系统时间戳。
-## 布尔型常量
+### 布尔型常量
布尔值常量 `TRUE` 和 `FALSE` 分别等价于 `1` 和 `0`,它们对大小写不敏感。
-## 空值
+### 空值
`NULL`值表示没有数据。`NULL`对大小写不敏感。
-# 标识符
+## 标识符
-## 使用场景
+### 使用场景
在 IoTDB 中,触发器名称、UDF函数名、元数据模板名称、用户与角色名、连续查询标识、Pipe、PipeSink、键值对中的键和值、别名等可以作为标识符。
-## 约束
+### 约束
请注意,此处约束是标识符的通用约束,具体标识符可能还附带其它约束条件,如用户名限制字符数大于等于4,更严格的约束请参考具体标识符相关的说明文档。
@@ -181,7 +181,7 @@ MySQL 对字符串的定义可以参考:[MySQL :: MySQL 8.0 Reference Manual :
- 标识符包含不允许的特殊字符。
- 标识符为实数。
-## 如何在反引号引起的标识符中使用引号
+### 如何在反引号引起的标识符中使用引号
**在反引号引起的标识符中可以直接使用单引号和双引号。**
@@ -197,7 +197,7 @@ create schema template `t1't"t`
(temperature FLOAT encoding=RLE, status BOOLEAN encoding=PLAIN
compression=SNAPPY)
```
-## 特殊情况示例
+### 特殊情况示例
需要使用反引号进行引用的部分情况示例:
@@ -277,11 +277,11 @@ create schema template `t1't"t`
- 用于表示键值对,键值对的键和值可以被定义成常量(包括字符串)或者标识符,具体请参考键值对章节。
-# 路径结点名
+## 路径结点名
路径结点名是特殊的标识符,其还可以是通配符 \* 或 \*\*。在创建时间序列时,各层级的路径结点名不能为通配符 \* 或
\*\*。在查询语句中,可以用通配符 \* 或 \*\* 来表示路径结点名,以匹配一层或多层路径。
-## 通配符
+### 通配符
`*`在路径中表示一层。例如`root.vehicle.*.sensor1`代表的是以`root.vehicle`为前缀,以`sensor1`为后缀,层次等于
4 层的路径。
@@ -312,7 +312,7 @@ select a*b from root.sg
|Time|root.sg.a * root.sg.b|
```
-## 标识符
+### 标识符
路径结点名不为通配符时,使用方法和标识符一致。**在 SQL 中需要使用反引号引用的路径结点,在结果集中也会用反引号引起。**
@@ -377,7 +377,7 @@ select `111` from root.sg
+-----------------------------+-------------+
```
-# 键值对
+## 键值对
**键值对的键和值可以被定义为标识符或者常量。**
@@ -476,13 +476,13 @@ CREATE PIPE my_pipe TO my_iotdb FROM
(select ** from root WHERE time>=yyyy-mm-dd HH:MM:SS) WITH 'SyncDelOp' = 'true'
```
-# 关键字和保留字
+## 关键字和保留字
关键字是在 SQL 具有特定含义的词,可以作为标识符。保留字是关键字的一个子集,保留字不能用于标识符。
关于 IoTDB 的关键字和保留字列表,可以查看
[关键字和保留字](https://iotdb.apache.org/zh/UserGuide/Master/Reference/Keywords.html)
。
-# Session、TsFile API
+## Session、TsFile API
在使用Session、TsFIle
API时,如果您调用的方法需要以字符串形式传入物理量(measurement)、设备(device)、存储组(storage
group)、路径(path)等参数,**请保证所传入字符串与使用 SQL
语句时的写法一致**,下面是一些帮助您理解的例子。具体代码示例可以参考:`example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java`
@@ -581,7 +581,7 @@ String[] paths = new String[]{"root.sg.a",
"root.sg.`a.``\"b`", "root.sg.`111`"}
List<String> pathList = Arrays.asList(paths);
```
-# 词法与文法详细定义
+## 词法与文法详细定义
请阅读代码仓库中的词法和语法描述文件:
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index cdd891c66b..5c37286c78 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -170,48 +170,13 @@ public class SinkHandle implements ISinkHandle {
throw new UnsupportedOperationException();
}
- private void sendEndOfDataBlockEvent() throws Exception {
- logger.debug("[NotifyNoMoreTsBlock]");
- int attempt = 0;
- TEndOfDataBlockEvent endOfDataBlockEvent =
- new TEndOfDataBlockEvent(
- remoteFragmentInstanceId,
- remotePlanNodeId,
- localFragmentInstanceId,
- nextSequenceId - 1);
- while (attempt < MAX_ATTEMPT_TIMES) {
- attempt += 1;
- try (SyncDataNodeMPPDataExchangeServiceClient client =
- mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
- client.onEndOfDataBlockEvent(endOfDataBlockEvent);
- break;
- } catch (Throwable e) {
- logger.error("Failed to send end of data block event, attempt times:
{}", attempt, e);
- if (attempt == MAX_ATTEMPT_TIMES) {
- throw e;
- }
- Thread.sleep(retryIntervalInMs);
- }
- }
- }
-
@Override
public synchronized void setNoMoreTsBlocks() {
logger.debug("[StartSetNoMoreTsBlocks]");
if (aborted || closed) {
return;
}
- try {
- sendEndOfDataBlockEvent();
- } catch (Exception e) {
- throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
- }
- noMoreTsBlocks = true;
-
- if (isFinished()) {
- sinkHandleListener.onFinish(this);
- }
- sinkHandleListener.onEndOfBlocks(this);
+ executorService.submit(new SendEndOfDataBlockEventTask());
}
@Override
@@ -413,4 +378,51 @@ public class SinkHandle implements ISinkHandle {
}
}
}
+
+ /**
+ * Send a {@link org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent} to
downstream fragment
+ * instance.
+ */
+ class SendEndOfDataBlockEventTask implements Runnable {
+
+ @Override
+ public void run() {
+ try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
+ logger.debug("[NotifyNoMoreTsBlock]");
+ int attempt = 0;
+ TEndOfDataBlockEvent endOfDataBlockEvent =
+ new TEndOfDataBlockEvent(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localFragmentInstanceId,
+ nextSequenceId - 1);
+ while (attempt < MAX_ATTEMPT_TIMES) {
+ attempt += 1;
+ try (SyncDataNodeMPPDataExchangeServiceClient client =
+
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
+ client.onEndOfDataBlockEvent(endOfDataBlockEvent);
+ break;
+ } catch (Throwable e) {
+ logger.error("Failed to send end of data block event, attempt
times: {}", attempt, e);
+ if (attempt == MAX_ATTEMPT_TIMES) {
+ logger.error("Failed to send end of data block event after all
retry", e);
+ sinkHandleListener.onFailure(SinkHandle.this, e);
+ return;
+ }
+ try {
+ Thread.sleep(retryIntervalInMs);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ sinkHandleListener.onFailure(SinkHandle.this, e);
+ }
+ }
+ }
+ noMoreTsBlocks = true;
+ if (isFinished()) {
+ sinkHandleListener.onFinish(SinkHandle.this);
+ }
+ sinkHandleListener.onEndOfBlocks(SinkHandle.this);
+ }
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 411370c14a..5b194b81d7 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -145,7 +145,13 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Mockito.verify(mockSinkHandleListener,
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+ try {
+ Thread.sleep(500L);
+ Mockito.verify(mockSinkHandleListener,
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
// Ack tsblocks.
sinkHandle.acknowledgeTsBlock(0, numOfMockTsBlock);
@@ -311,7 +317,14 @@ public class SinkHandleTest {
sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Mockito.verify(mockSinkHandleListener,
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+ try {
+ Thread.sleep(500L);
+ Mockito.verify(mockSinkHandleListener,
Mockito.times(1)).onEndOfBlocks(sinkHandle);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
try {
Thread.sleep(100L);
Mockito.verify(mockClient, Mockito.times(1))
@@ -439,12 +452,13 @@ public class SinkHandleTest {
// Close the SinkHandle.
try {
sinkHandle.setNoMoreTsBlocks();
- Assert.fail("Expect an RuntimeException.");
- } catch (RuntimeException e) {
- Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
+ Assert.assertFalse(sinkHandle.isAborted());
+ Thread.sleep(500L);
+ Mockito.verify(mockSinkHandleListener,
Mockito.times(0)).onEndOfBlocks(sinkHandle);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
}
- Assert.assertFalse(sinkHandle.isAborted());
- Mockito.verify(mockSinkHandleListener,
Mockito.times(0)).onEndOfBlocks(sinkHandle);
// Abort the SinkHandle.
sinkHandle.abort();