This is an automated email from the ASF dual-hosted git repository.

critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c059bcc2068 Feature add support for query, nonQuery, and insertTablet 
interfaces for table models in the REST service (#14165)
c059bcc2068 is described below

commit c059bcc2068403c8faac96885ba2775804076511
Author: CloudWise-Lukemiao <[email protected]>
AuthorDate: Mon Dec 23 17:38:53 2024 +0800

    Feature add support for query, nonQuery, and insertTablet interfaces for 
table models in the REST service (#14165)
    
    * Feature add support for query, nonQuery, and insertTablet interfaces for 
table models in the REST service.
    
    * Feature add support for query, nonQuery, and insertTablet interfaces for 
table models in the REST service.
    
    * Feature add support for query, nonQuery, and insertTablet interfaces for 
table models in the REST service.
    
    * Remove useless code
    
    * Remove useless code
    
    * Fix IT failure issue
    
    * Fix IT failure issue
    
    * Fix IT failure issue
    
    * Fix IT failure issue
    
    * Fix IT failure issue
    
    * add example
---
 .../java/org/apache/iotdb/TableHttpExample.java    | 223 ++++++++++
 .../java/org/apache/iotdb/TableHttpsExample.java   | 223 ++++++++++
 .../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java | 492 +++++++++++++++++++++
 .../it/rest/it/IoTDBRestServiceFlushQueryIT.java   | 311 +++++++++++++
 .../relational/it/rest/it/IoTDBRestServiceIT.java  | 350 +++++++++++++++
 .../it/IoTDBRestServiceInsertAlignedValuesIT.java  | 366 +++++++++++++++
 .../iotdb/relational/it/rest/it/RestUtils.java     | 128 ++++++
 .../rest/table/v1/handler/ExceptionHandler.java    |  91 ++++
 .../table/v1/handler/ExecuteStatementHandler.java  |  84 ++++
 .../rest/table/v1/handler/QueryDataSetHandler.java | 219 +++++++++
 .../table/v1/handler/RequestValidationHandler.java | 100 +++++
 .../v1/handler/StatementConstructionHandler.java   | 188 ++++++++
 .../rest/table/v1/impl/RestApiServiceImpl.java     | 293 ++++++++++++
 iotdb-protocol/openapi/pom.xml                     |  27 ++
 .../src/main/openapi3/iotdb_rest_table_v1.yaml     | 167 +++++++
 15 files changed, 3262 insertions(+)

diff --git 
a/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpExample.java
 
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpExample.java
new file mode 100644
index 00000000000..fb223ae7b91
--- /dev/null
+++ 
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpExample.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class TableHttpExample {
+
+  private static final String UTF8 = "utf-8";
+
+  private String getAuthorization(String username, String password) {
+    return Base64.getEncoder()
+        .encodeToString((username + ":" + 
password).getBytes(StandardCharsets.UTF_8));
+  }
+
+  public static void main(String[] args) {
+    TableHttpExample httpExample = new TableHttpExample();
+    httpExample.ping();
+    httpExample.createDatabase();
+    httpExample.createTable();
+    httpExample.nonQuery();
+    httpExample.insertTablet();
+    httpExample.query();
+  }
+
+  public void ping() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    HttpGet httpGet = new HttpGet("http://127.0.0.1:18080/ping";);
+    CloseableHttpResponse response = null;
+    try {
+      response = httpClient.execute(httpGet);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      String result = 
JsonParser.parseString(message).getAsJsonObject().toString();
+      System.out.println(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The ping rest api failed");
+    } finally {
+      try {
+        httpClient.close();
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Http Client close error");
+      }
+    }
+  }
+
+  private HttpPost getHttpPost(String url) {
+    HttpPost httpPost = new HttpPost(url);
+    httpPost.addHeader("Content-type", "application/json; charset=utf-8");
+    httpPost.setHeader("Accept", "application/json");
+    String authorization = getAuthorization("root", "root");
+    httpPost.setHeader("Authorization", authorization);
+    return httpPost;
+  }
+
+  public void insertTablet() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("http://127.0.0.1:18080/rest/table/v1/insertTablet";);
+      String json =
+          
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11333],[\"a11\",\"false\",22333],[\"a13\",\"false1\",23333],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      String result = 
JsonParser.parseString(message).getAsJsonObject().toString();
+      System.out.println(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The insertTablet rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void nonQuery() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("http://127.0.0.1:18080/rest/table/v1/nonQuery";);
+      String sql =
+          "{\"database\":\"test\",\"sql\":\"INSERT INTO sg211(time, id1, s1) 
values(100, 'd1', 0)\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The non query rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void createDatabase() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("http://127.0.0.1:18080/rest/table/v1/nonQuery";);
+      String sql = "{\"database\":\"\",\"sql\":\"create database if not exists 
test\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The non query rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void createTable() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("http://127.0.0.1:18080/rest/table/v1/nonQuery";);
+      String sql =
+          "{\"database\":\"test\",\"sql\":\"create table sg211 (id1 string 
id,t1 STRING ATTRIBUTE, s1 FLOAT measurement)\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("create table failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void query() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("http://127.0.0.1:18080/rest/table/v1/query";);
+      String sql = "{\"database\":\"test\",\"sql\":\"select * from sg211\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The query rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+}
diff --git 
a/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpsExample.java
 
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpsExample.java
new file mode 100644
index 00000000000..aa5e4afd76d
--- /dev/null
+++ 
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpsExample.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class TableHttpsExample {
+
+  private static final String UTF8 = "utf-8";
+
+  private String getAuthorization(String username, String password) {
+    return Base64.getEncoder()
+        .encodeToString((username + ":" + 
password).getBytes(StandardCharsets.UTF_8));
+  }
+
+  public static void main(String[] args) {
+    TableHttpsExample httpExample = new TableHttpsExample();
+    httpExample.ping();
+    httpExample.createDatabase();
+    httpExample.createTable();
+    httpExample.nonQuery();
+    httpExample.insertTablet();
+    httpExample.query();
+  }
+
+  public void ping() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    HttpGet httpGet = new HttpGet("https://127.0.0.1:18080/ping";);
+    CloseableHttpResponse response = null;
+    try {
+      response = httpClient.execute(httpGet);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      String result = 
JsonParser.parseString(message).getAsJsonObject().toString();
+      System.out.println(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The ping rest api failed");
+    } finally {
+      try {
+        httpClient.close();
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Http Client close error");
+      }
+    }
+  }
+
+  private HttpPost getHttpPost(String url) {
+    HttpPost httpPost = new HttpPost(url);
+    httpPost.addHeader("Content-type", "application/json; charset=utf-8");
+    httpPost.setHeader("Accept", "application/json");
+    String authorization = getAuthorization("root", "root");
+    httpPost.setHeader("Authorization", authorization);
+    return httpPost;
+  }
+
+  public void insertTablet() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("https://127.0.0.1:18080/rest/table/v1/insertTablet";);
+      String json =
+          
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11333],[\"a11\",\"false\",22333],[\"a13\",\"false1\",23333],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      String result = 
JsonParser.parseString(message).getAsJsonObject().toString();
+      System.out.println(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The insertTablet rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void nonQuery() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("https://127.0.0.1:18080/rest/table/v1/nonQuery";);
+      String sql =
+          "{\"database\":\"test\",\"sql\":\"INSERT INTO sg211(time, id1, s1) 
values(100, 'd1', 0)\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The non query rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void createDatabase() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("https://127.0.0.1:18080/rest/table/v1/nonQuery";);
+      String sql = "{\"database\":\"\",\"sql\":\"create database if not exists 
test\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The non query rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void createTable() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("https://127.0.0.1:18080/rest/table/v1/nonQuery";);
+      String sql =
+          "{\"database\":\"test\",\"sql\":\"create table sg211 (id1 string 
id,t1 STRING ATTRIBUTE, s1 FLOAT measurement)\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("create table failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+
+  public void query() {
+    CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = 
getHttpPost("https://127.0.0.1:18080/rest/table/v1/query";);
+      String sql = "{\"database\":\"test\",\"sql\":\"select * from sg211\"}";
+      httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, UTF8);
+      
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.out.println("The query rest api failed");
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        System.out.println("Response close error");
+      }
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceCaseWhenThenIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceCaseWhenThenIT.java
new file mode 100644
index 00000000000..4e48015e24e
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceCaseWhenThenIT.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceCaseWhenThenIT {
+
+  private int port = 18080;
+  private CloseableHttpClient httpClient = null;
+
+  @Before
+  public void setUp() throws Exception {
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    baseEnv.initClusterEnvironment();
+    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    port = portConflictDataNodeWrapper.getRestServicePort();
+    httpClient = HttpClientBuilder.create().build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (httpClient != null) {
+        httpClient.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private static final String DATABASE = "test";
+
+  private static final String[] expectedHeader = {"_col0"};
+
+  private static final String[] SQLs =
+      new String[] {
+        // normal cases
+        "CREATE DATABASE " + DATABASE,
+        "CREATE table table1 (device_id STRING ID, s1 INT32 MEASUREMENT, s5 
BOOLEAN MEASUREMENT, s6 TEXT MEASUREMENT)",
+        "CREATE table table2 (device_id STRING ID, s3 FLOAT MEASUREMENT, s4 
DOUBLE MEASUREMENT)",
+        "CREATE table table3 (device_id STRING ID, s2 INT64 MEASUREMENT)",
+        "INSERT INTO table1(time, device_id, s1) values(100, 'd1', 0)",
+        "INSERT INTO table1(time, device_id, s1) values(200, 'd1', 11)",
+        "INSERT INTO table1(time, device_id, s1) values(300, 'd1', 22)",
+        "INSERT INTO table1(time, device_id, s1) values(400, 'd1', 33)",
+        "INSERT INTO table2(time, device_id, s3) values(100, 'd1', 0)",
+        "INSERT INTO table2(time, device_id, s3) values(200, 'd1', 11)",
+        "INSERT INTO table2(time, device_id, s3) values(300, 'd1', 22)",
+        "INSERT INTO table2(time, device_id, s3) values(400, 'd1', 33)",
+        "INSERT INTO table2(time, device_id, s4) values(100, 'd1', 44)",
+        "INSERT INTO table2(time, device_id, s4) values(200, 'd1', 55)",
+        "INSERT INTO table2(time, device_id, s4) values(300, 'd1', 66)",
+        "INSERT INTO table2(time, device_id, s4) values(400, 'd1', 77)",
+      };
+
+  @Test
+  public void test() {
+    ping();
+    prepareTableData();
+    testKind1Basic();
+    testKind2Basic();
+    testShortCircuitEvaluation();
+    testKind1InputTypeRestrict();
+    testKind2InputTypeRestrict();
+    testKind1OutputTypeRestrict();
+    testKind2OutputTypeRestrict();
+    testKind1UsedInOtherOperation();
+    testKind2UsedInOtherOperation();
+    testKind1UseOtherOperation();
+    testKind2UseOtherOperation();
+    testKind1UseInWhereClause();
+    testKind1CaseInCase();
+    testKind2CaseInCase();
+    testKind1Logic();
+    testKind2UseOtherOperation();
+    testKind1UseInWhereClause();
+  }
+
+  public void ping() {
+    HttpGet httpGet = new HttpGet("http://127.0.0.1:"; + port + "/ping");
+    CloseableHttpResponse response = null;
+    try {
+      for (int i = 0; i < 30; i++) {
+        try {
+          response = httpClient.execute(httpGet);
+          break;
+        } catch (Exception e) {
+          if (i == 29) {
+            throw e;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+      assertEquals(200, response.getStatusLine().getStatusCode());
+      assertEquals(200, Integer.parseInt(result.get("code").toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+  }
+
+  public void testKind1Basic() {
+    String[] retArray = new String[] {"99,", "9999,", "9999,", "999,"};
+    tableResultSetEqualTest(
+        "select case when s1=0 then 99 when s1>22 then 999 else 9999 end from 
table1",
+        expectedHeader,
+        retArray,
+        DATABASE);
+  }
+
+  public void testKind2Basic() {
+    String sql = "select case s1 when 0 then 99 when 22 then 999 else 9999 end 
from table1";
+    String[] retArray = new String[] {"99,", "9999,", "999,", "9999,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // without ELSE clause
+    sql = "select case s1 when 0 then 99 when 22 then 999 end from table1";
+    retArray = new String[] {"99,", "null,", "999,", "null,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testShortCircuitEvaluation() {
+    String[] retArray = new String[] {"0,", "11,", "22,", "33,"};
+    tableResultSetEqualTest(
+        "select case when 1=0 then s1/0 when 1!=0 then s1 end from table1",
+        expectedHeader,
+        retArray,
+        DATABASE);
+  }
+
+  public void testKind1InputTypeRestrict() {
+    // WHEN clause must return BOOLEAN
+    String sql = "select case when s1+1 then 20 else 22 end from table1";
+    String msg = "701: CASE WHEN clause must evaluate to a BOOLEAN (actual: 
INT32)";
+    tableAssertTestFail(sql, msg, DATABASE);
+  }
+
+  public void testKind2InputTypeRestrict() {
+    // the expression in CASE clause must be able to be equated with the 
expression in WHEN clause
+    String sql = "select case s1 when '1' then 20 else 22 end from table1";
+    String msg = "701: CASE operand type does not match WHEN clause operand 
type: INT32 vs STRING";
+    tableAssertTestFail(sql, msg, DATABASE);
+  }
+
+  public void testKind1OutputTypeRestrict() {
+    // BOOLEAN and other types cannot exist at the same time
+    String[] retArray = new String[] {"true,", "false,", "true,", "true,"};
+    // success
+    tableResultSetEqualTest(
+        "select case when s1<=0 then true when s1=11 then false else true end 
from table1",
+        expectedHeader,
+        retArray,
+        DATABASE);
+    // fail
+    tableAssertTestFail(
+        "select case when s1<=0 then true else 22 end from table1",
+        "701: All CASE results must be the same type or coercible to a common 
type. Cannot find common type between BOOLEAN and INT32, all types (without 
duplicates): [BOOLEAN, INT32]",
+        DATABASE);
+
+    // TEXT and other types cannot exist at the same time
+    retArray = new String[] {"good,", "bad,", "okok,", "okok,"};
+    // success
+    tableResultSetEqualTest(
+        "select case when s1<=0 then 'good' when s1=11 then 'bad' else 'okok' 
end from table1",
+        expectedHeader,
+        retArray,
+        DATABASE);
+    // fail
+    tableAssertTestFail(
+        "select case when s1<=0 then 'good' else 22 end from table1",
+        "701: All CASE results must be the same type or coercible to a common 
type. Cannot find common type between STRING and INT32, all types (without 
duplicates): [STRING, INT32]",
+        DATABASE);
+  }
+
+  public void testKind2OutputTypeRestrict() {
+    // BOOLEAN and other types cannot exist at the same time
+    String[] retArray =
+        new String[] {
+          "true,", "false,", "true,", "true,",
+        };
+    // success
+    tableResultSetEqualTest(
+        "select case s1 when 0 then true when 11 then false else true end from 
table1",
+        expectedHeader,
+        retArray,
+        DATABASE);
+    // fail
+    tableAssertTestFail(
+        "select case s1 when 0 then true else 22 end from table1",
+        "701: All CASE results must be the same type or coercible to a common 
type. Cannot find common type between BOOLEAN and INT32, all types (without 
duplicates): [BOOLEAN, INT32]",
+        DATABASE);
+
+    // TEXT and other types cannot exist at the same time
+    retArray = new String[] {"good,", "bad,", "okok,", "okok,"};
+    // success
+    tableResultSetEqualTest(
+        "select case s1 when 0 then 'good' when 11 then 'bad' else 'okok' end 
from table1",
+        expectedHeader,
+        retArray,
+        DATABASE);
+    // fail
+    tableAssertTestFail(
+        "select case s1 when 0 then 'good' else 22 end from table1",
+        "701: All CASE results must be the same type or coercible to a common 
type. Cannot find common type between STRING and INT32, all types (without 
duplicates): [STRING, INT32]",
+        DATABASE);
+  }
+
+  public void testKind1UsedInOtherOperation() {
+    String sql;
+    String[] retArray;
+
+    // use in scalar operation
+
+    // multiply
+    sql = "select 2 * case when s1=0 then 99 when s1=22.0 then 999 else 9999 
end from table1";
+    retArray = new String[] {"198,", "19998,", "1998,", "19998,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // add
+    sql =
+        "select "
+            + "case when s1=0 then 99 when s1=22.0 then 999 else 9999 end "
+            + "+"
+            + "case when s1=11 then 99 else 9999 end "
+            + "from table1";
+    retArray =
+        new String[] {
+          "10098,", "10098,", "10998,", "19998,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // function
+    sql = "select diff(case when s1=0 then 99 when s1>22 then 999 else 9999 
end) from table1";
+    retArray = new String[] {"null,", "9900.0,", "0.0,", "-9000.0,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testKind2UsedInOtherOperation() {
+    String sql;
+    String[] retArray;
+
+    // use in scalar operation
+
+    // multiply
+    sql = "select 2 * case s1 when 0 then 99 when 22 then 999 else 9999 end 
from table1";
+    retArray = new String[] {"198,", "19998,", "1998,", "19998,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+    sql = "select diff(case s1 when 0 then 99 when 22 then 999 else 9999 end) 
from table1";
+    retArray =
+        new String[] {
+          "null,", "9900.0,", "-9000.0,", "9000.0,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testKind1UseOtherOperation() {
+    // WHEN-clause use scalar function
+    String sql = "select case when sin(s1)>=0 then '>0' else '<0' end from 
table1";
+    String[] retArray =
+        new String[] {
+          ">0,", "<0,", "<0,", ">0,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // THEN-clause and ELSE-clause use scalar function
+
+    // TODO: align by is not supported.
+
+    //    sql =
+    //        "select case when s1<=11 then CAST(diff(s1) as TEXT) else 
CAST(s1-1 as TEXT) end from
+    // table1 align by device";
+    //
+    //    retArray =
+    //        new String[] {
+    //          "0,table1,null,",
+    //          "1000000,table1,11.0,",
+    //          "20000000,table1,21.0,",
+    //          "210000000,table1,32.0,",
+    //        };
+    //    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testKind2UseOtherOperation() {
+    // CASE-clause use scalar function
+    String sql =
+        "select case round(sin(s1)) when 0 then '=0' when -1 then '<0' else 
'>0' end from table1";
+
+    tableAssertTestFail(
+        sql,
+        "701: CASE operand type does not match WHEN clause operand type: 
DOUBLE vs INT32",
+        DATABASE);
+    //    String[] retArray =
+    //        new String[] {
+    //          "=0,", "<0,", ">0,", ">0,",
+    //        };
+    //    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // WHEN-clause use scalar function
+    sql = "select case 0 when sin(s1) then '=0' else '!=0' end from table1";
+    tableAssertTestFail(
+        sql,
+        "701: CASE operand type does not match WHEN clause operand type: INT32 
vs DOUBLE",
+        DATABASE);
+    //    retArray =
+    //        new String[] {
+    //          "=0,", "!=0,", "!=0,", "!=0,",
+    //        };
+    //    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // THEN-clause and ELSE-clause use scalar function
+    //    sql =
+    //        "select case s1 when 11 then CAST(diff(s1) as TEXT) else 
CAST(s1-1 as TEXT) end from
+    // table1 align by device";
+    //
+    //    retArray =
+    //        new String[] {
+    //          "table1,-1.0,", "table1,11.0,", "table1,21.0,", "table1,32.0,",
+    //        };
+    //    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+    // UDF is not allowed
+    //    sql = "select case s1 when 0 then change_points(s1) end from table1";
+    //    String msg = "301: CASE expression cannot be used with non-mappable 
UDF";
+    //    tableAssertTestFail(sql, msg, DATABASE);
+  }
+
+  public void testKind1UseInWhereClause() {
+    String sql =
+        "select s4 from table2 where case when s3=0 then s4>44 when s3=22 then 
s4>0 when time>300 then true end";
+    String[] retArray = new String[] {"66.0,", "77.0,"};
+    tableResultSetEqualTest(sql, new String[] {"s4"}, retArray, DATABASE);
+
+    sql =
+        "select case when s3=0 then s4>44 when s3=22 then s4>0 when time>300 
then true end from table2";
+    retArray =
+        new String[] {
+          "false,", "null,", "true,", "true,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testKind1CaseInCase() {
+    String sql =
+        "select case when s1=0 OR s1=22 then cast(case when s1=0 then 99 when 
s1>22 then 999 end as STRING) else 'xxx' end from table1";
+    String[] retArray =
+        new String[] {
+          "99,", "xxx,", "null,", "xxx,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testKind2CaseInCase() {
+    String sql =
+        "select case s1 when 0 then cast(case when s1=0 then 99 when s1>22 
then 999 end as STRING) when 22 then cast(case when s1=0 then 99 when s1>22 
then 999 end as STRING) else 'xxx' end from table1";
+    String[] retArray =
+        new String[] {
+          "99,", "xxx,", "null,", "xxx,",
+        };
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void testKind1Logic() {
+    String sql =
+        "select case when s3 >= 0 and s3 < 20 and s4 >= 50 and s4 < 60 then 
'just so so~~~' when s3 >= 20 and s3 < 40 and s4 >= 70 and s4 < 80 then 'very 
well~~~' end from table2";
+    String[] retArray = new String[] {"null,", "just so so~~~,", "null,", 
"very well~~~,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+  }
+
+  public void tableAssertTestFail(String sql, String errMsg, String database) {
+    JsonObject jsonObject = new JsonObject();
+    jsonObject.addProperty("database", database);
+    jsonObject.addProperty("sql", sql);
+    JsonObject result = query(jsonObject.toString());
+    assertEquals(errMsg, result.get("code") + ": " + 
result.get("message").getAsString());
+  }
+
+  public void tableResultSetEqualTest(
+      String sql, String[] expectedHeader, String[] expectedRetArray, String 
database) {
+    JsonObject jsonObject = new JsonObject();
+    jsonObject.addProperty("database", database);
+    jsonObject.addProperty("sql", sql);
+    JsonObject result = query(jsonObject.toString());
+    JsonArray columnNames = result.get("column_names").getAsJsonArray();
+    JsonArray valuesList = result.get("values").getAsJsonArray();
+    for (int i = 0; i < columnNames.size(); i++) {
+      assertEquals(expectedHeader[i], columnNames.get(i).getAsString());
+    }
+    assertEquals(expectedHeader.length, columnNames.size());
+    int cnt = 0;
+    for (int i = 0; i < valuesList.size(); i++) {
+      StringBuilder builder = new StringBuilder();
+      JsonArray values = valuesList.get(i).getAsJsonArray();
+      for (int c = 0; c < values.size(); c++) {
+        if (!values.get(c).isJsonNull()) {
+          builder.append(values.get(c).getAsString()).append(",");
+        } else {
+          builder.append(values.get(c).toString()).append(",");
+        }
+      }
+      assertEquals(expectedRetArray[i], builder.toString());
+      cnt++;
+    }
+    assertEquals(expectedRetArray.length, cnt);
+  }
+
+  public void prepareTableData() {
+    for (int i = 0; i < SQLs.length; i++) {
+      JsonObject jsonObject = new JsonObject();
+      if (i > 0) {
+        jsonObject.addProperty("database", DATABASE);
+      } else {
+        jsonObject.addProperty("database", "");
+      }
+      jsonObject.addProperty("sql", SQLs[i]);
+      nonQuery(jsonObject.toString());
+    }
+  }
+
+  public JsonObject query(String json) {
+    return RestUtils.query(httpClient, port, json);
+  }
+
+  public JsonObject nonQuery(String json) {
+    return RestUtils.nonQuery(httpClient, port, json);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceFlushQueryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceFlushQueryIT.java
new file mode 100644
index 00000000000..0b37fe33500
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceFlushQueryIT.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceFlushQueryIT {
+
+  private int port = 18080;
+  private CloseableHttpClient httpClient = null;
+
+  @Before
+  public void setUp() throws Exception {
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    baseEnv.initClusterEnvironment();
+    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    port = portConflictDataNodeWrapper.getRestServicePort();
+    httpClient = HttpClientBuilder.create().build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (httpClient != null) {
+        httpClient.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private static final String DATABASE = "test";
+
+  private static final String[] sqls =
+      new String[] {
+        "CREATE DATABASE test",
+        "CREATE TABLE vehicle (id1 string id, s0 int32 measurement)",
+        "insert into vehicle(id1,time,s0) values('d0',1,101)",
+        "insert into vehicle(id1,time,s0) values('d0',2,198)",
+        "insert into vehicle(id1,time,s0) values('d0',100,99)",
+        "insert into vehicle(id1,time,s0) values('d0',101,99)",
+        "insert into vehicle(id1,time,s0) values('d0',102,80)",
+        "insert into vehicle(id1,time,s0) values('d0',103,99)",
+        "insert into vehicle(id1,time,s0) values('d0',104,90)",
+        "insert into vehicle(id1,time,s0) values('d0',105,99)",
+        "insert into vehicle(id1,time,s0) values('d0',106,99)",
+        "flush",
+        "insert into vehicle(id1,time,s0) values('d0',2,10000)",
+        "insert into vehicle(id1,time,s0) values('d0',50,10000)",
+        "insert into vehicle(id1,time,s0) values('d0',1000,22222)",
+      };
+
+  public void ping() {
+    HttpGet httpGet = new HttpGet("http://127.0.0.1:"; + port + "/ping");
+    CloseableHttpResponse response = null;
+    try {
+      for (int i = 0; i < 30; i++) {
+        try {
+          response = httpClient.execute(httpGet);
+          break;
+        } catch (Exception e) {
+          if (i == 29) {
+            throw e;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+      assertEquals(200, response.getStatusLine().getStatusCode());
+      assertEquals(200, Integer.parseInt(result.get("code").toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+  }
+
+  @Test
+  public void test() {
+    ping();
+    prepareTableData();
+    selectAllSQLTest();
+    testFlushGivenGroup();
+    testFlushGivenGroupNoData();
+  }
+
+  public String sqlHandler(String database, String sql) {
+    JsonObject json = new JsonObject();
+    json.addProperty("database", database);
+    json.addProperty("sql", sql);
+    return json.toString();
+  }
+
+  public void selectAllSQLTest() {
+    String sql = sqlHandler("test", "SELECT * FROM vehicle");
+    JsonObject jsonObject = query(sql);
+    JsonArray valuesList = jsonObject.getAsJsonArray("values");
+    for (int i = 0; i < valuesList.size(); i++) {
+      JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+      for (int j = 0; j < jsonArray.size(); j++) {
+        jsonArray.get(j);
+      }
+    }
+  }
+
+  public void testFlushGivenGroup() {
+    List<String> list =
+        Arrays.asList("CREATE DATABASE group1", "CREATE DATABASE group2", 
"CREATE DATABASE group3");
+    for (String sql : list) {
+      nonQuery(sqlHandler("", sql));
+    }
+
+    String insertTemplate =
+        "INSERT INTO vehicle(id1, time, s1, s2, s3) VALUES (%s, %d, %d, %f, 
%s)";
+    for (int i = 1; i <= 3; i++) {
+      nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+      nonQuery(
+          sqlHandler(
+              String.format("group%d", i),
+              "CREATE TABLE vehicle (id1 string id, s1 int32 measurement, s2 
float measurement, s3 string measurement)"));
+
+      for (int j = 10; j < 20; j++) {
+        nonQuery(String.format(Locale.CHINA, insertTemplate, i, j, j, j * 0.1, 
j));
+      }
+    }
+    nonQuery(sqlHandler("", "FLUSH"));
+
+    for (int i = 1; i <= 3; i++) {
+      nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+      nonQuery(
+          sqlHandler(
+              String.format("group%d", i),
+              "CREATE TABLE vehicle (id1 string id, s1 int32 measurement, s2 
float measurement, s3 string measurement)"));
+    }
+    nonQuery(sqlHandler("", "FLUSH group1"));
+    nonQuery(sqlHandler("", "FLUSH group2,group3"));
+
+    for (int i = 1; i <= 3; i++) {
+      nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+      nonQuery(
+          sqlHandler(
+              String.format("group%d", i),
+              "CREATE TABLE vehicle (id1 string id, s1 int32 measurement, s2 
float measurement, s3 string measurement)"));
+
+      for (int j = 0; j < 30; j++) {
+        nonQuery(String.format(Locale.CHINA, insertTemplate, i, j, j, j * 0.1, 
j));
+      }
+    }
+    nonQuery(sqlHandler("", "FLUSH group1 TRUE"));
+    nonQuery(sqlHandler("", "FLUSH group2,group3 FALSE"));
+
+    for (int i = 1; i <= 3; i++) {
+      int count = 0;
+      nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+      JsonObject jsonObject =
+          query(sqlHandler(String.format("group%d", i), "SELECT * FROM 
vehicle"));
+
+      JsonArray valuesList = jsonObject.getAsJsonArray("values");
+      for (int c = 0; c < valuesList.size(); c++) {
+        count++;
+        JsonArray jsonArray = valuesList.get(c).getAsJsonArray();
+        for (int j = 0; j < jsonArray.size(); j++) {
+          jsonArray.get(j);
+        }
+        assertEquals(30, count);
+      }
+    }
+  }
+
+  public void testFlushGivenGroupNoData() {
+    List<String> list =
+        Arrays.asList(
+            "CREATE DATABASE nodatagroup1",
+            "CREATE DATABASE nodatagroup2",
+            "CREATE DATABASE nodatagroup3",
+            "FLUSH nodatagroup1",
+            "FLUSH nodatagroup2",
+            "FLUSH nodatagroup3",
+            "FLUSH nodatagroup1, nodatagroup2");
+    for (String sql : list) {
+      JsonObject jsonObject = new JsonObject();
+      jsonObject.addProperty("database", "");
+      jsonObject.addProperty("sql", sql);
+      nonQuery(jsonObject.toString());
+    }
+  }
+
+  public void tableAssertTestFail(String sql, String errMsg, String database) {
+    JsonObject jsonObject = new JsonObject();
+    jsonObject.addProperty("database", database);
+    jsonObject.addProperty("sql", sql);
+    JsonObject result = query(jsonObject.toString());
+    assertEquals(errMsg, result.get("code") + ": " + 
result.get("message").getAsString());
+  }
+
+  public void tableResultSetEqualTest(
+      String sql, String[] expectedHeader, String[] expectedRetArray, String 
database) {
+    JsonObject jsonObject = new JsonObject();
+    jsonObject.addProperty("database", database);
+    jsonObject.addProperty("sql", sql);
+    JsonObject result = query(jsonObject.toString());
+    JsonArray columnNames = result.get("column_names").getAsJsonArray();
+    JsonArray valuesList = result.get("values").getAsJsonArray();
+    for (int i = 0; i < columnNames.size(); i++) {
+      assertEquals(expectedHeader[i], columnNames.get(i).getAsString());
+    }
+    assertEquals(expectedHeader.length, columnNames.size());
+    int cnt = 0;
+    for (int i = 0; i < valuesList.size(); i++) {
+      StringBuilder builder = new StringBuilder();
+      JsonArray values = valuesList.get(i).getAsJsonArray();
+      for (int c = 0; c < values.size(); c++) {
+        if (!values.get(c).isJsonNull()) {
+          builder.append(values.get(c).getAsString()).append(",");
+        } else {
+          builder.append(values.get(c).toString()).append(",");
+        }
+      }
+      assertEquals(expectedRetArray[i], builder.toString());
+      cnt++;
+    }
+    assertEquals(expectedRetArray.length, cnt);
+  }
+
+  public void prepareTableData() {
+    for (int i = 0; i < sqls.length; i++) {
+      JsonObject jsonObject = new JsonObject();
+      if (i > 0) {
+        jsonObject.addProperty("database", DATABASE);
+      } else {
+        jsonObject.addProperty("database", "");
+      }
+      jsonObject.addProperty("sql", sqls[i]);
+      nonQuery(jsonObject.toString());
+    }
+  }
+
+  public JsonObject query(String json) {
+    return RestUtils.query(httpClient, port, json);
+  }
+
+  public JsonObject nonQuery(String json) {
+    return RestUtils.nonQuery(httpClient, port, json);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceIT.java
new file mode 100644
index 00000000000..8d3a3b54684
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceIT.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceIT {
+
+  private int port = 18080;
+  private CloseableHttpClient httpClient = null;
+
+  @Before
+  public void setUp() throws Exception {
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    baseEnv.initClusterEnvironment();
+    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    port = portConflictDataNodeWrapper.getRestServicePort();
+    httpClient = HttpClientBuilder.create().build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (httpClient != null) {
+        httpClient.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private static final String DATABASE = "test";
+
+  private static final String[] sqls =
+      new String[] {
+        "create database if not exists test",
+        "use test",
+        "CREATE TABLE sg10(id1 string id, s1 int64 measurement, s2 float 
measurement, s3 string measurement)",
+        "CREATE TABLE sg11(id1 string id, s1 int64 measurement, s2 float 
measurement, s3 string measurement)"
+      };
+
+  public void ping() {
+    HttpGet httpGet = new HttpGet("http://127.0.0.1:"; + port + "/ping");
+    CloseableHttpResponse response = null;
+    try {
+      for (int i = 0; i < 30; i++) {
+        try {
+          response = httpClient.execute(httpGet);
+          break;
+        } catch (Exception e) {
+          if (i == 29) {
+            throw e;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+      assertEquals(200, response.getStatusLine().getStatusCode());
+      assertEquals(200, Integer.parseInt(result.get("code").toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+  }
+
+  @Test
+  public void test() {
+    ping();
+    prepareTableData();
+    rightNonQuery();
+    rightNonQuery2();
+    rightNonQuery3();
+    rightNonQuery4();
+    errorNonQuery();
+    errorNonQuery1();
+    errorNonQuery3();
+    testInsertMultiPartition();
+    testInsertTablet();
+    testInsertTabletNoDatabase();
+    testInsertTablet1();
+    testInsertTablet2();
+    testQuery();
+    testQuery1();
+    testQuery2();
+  }
+
+  public void testQuery() {
+    String sql = "insert into sg11(id1,s1,s2,s3,time) 
values('aa',11,1.1,1,1),('aa2',21,2.1,2,2)";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, 
sqlHandler("test", sql));
+    assertEquals(200, result.get("code").getAsInt());
+    JsonObject queryResult =
+        RestUtils.query(
+            httpClient,
+            port,
+            sqlHandler("test", "select id1,s1,s2,s3,time from sg11 order by 
time"));
+    JsonArray jsonArray = queryResult.get("values").getAsJsonArray();
+    for (int i = 0; i < jsonArray.size(); i++) {
+      JsonArray jsonArray1 = jsonArray.get(i).getAsJsonArray();
+
+      if (i == 0) {
+        assertEquals("aa", jsonArray1.get(0).getAsString());
+        assertEquals(11, jsonArray1.get(1).getAsInt());
+        assertEquals(1.1, jsonArray1.get(2).getAsFloat(), 0.000001f);
+        assertEquals("1", jsonArray1.get(3).getAsString());
+      } else if (i == 1) {
+        assertEquals("aa2", jsonArray1.get(0).getAsString());
+        assertEquals(21, jsonArray1.get(1).getAsInt());
+        assertEquals(2.1, jsonArray1.get(2).getAsFloat(), 0.000001f);
+        assertEquals("2", jsonArray1.get(3).getAsString());
+        assertEquals(2, jsonArray1.get(4).getAsLong());
+      }
+    }
+  }
+
+  public void testQuery1() {
+    JsonObject result =
+        RestUtils.query(
+            httpClient, port, sqlHandler(null, "select id1,s1,s2,s3,time from 
sg11 order by time"));
+    assertEquals(305, result.get("code").getAsInt());
+    assertEquals("database should not be null", 
result.get("message").getAsString());
+  }
+
+  public void testQuery2() {
+    JsonObject result = RestUtils.query(httpClient, port, sqlHandler("test", 
null));
+    assertEquals(305, result.get("code").getAsInt());
+    assertEquals("sql should not be null", 
result.get("message").getAsString());
+  }
+
+  public void rightNonQuery() {
+    String sql = "create database test1";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler("", 
sql));
+    assertEquals(200, result.get("code").getAsInt());
+  }
+
+  public void rightNonQuery2() {
+    String sql = "insert into sg10(id1,s1,time,s2) values('aa',1,1,1.1)";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, 
sqlHandler("test", sql));
+    assertEquals(200, result.get("code").getAsInt());
+  }
+
+  public void rightNonQuery4() {
+    String sql = "insert into sg10(id1,s1,time,s2) 
values('aa',1,1,1.1),('bb',2,2,2.1)";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, 
sqlHandler("test", sql));
+    assertEquals(200, result.get("code").getAsInt());
+  }
+
+  public void rightNonQuery3() {
+    String sql = "drop database test1";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, 
sqlHandler("test", sql));
+    assertEquals(200, result.get("code").getAsInt());
+  }
+
+  public void errorNonQuery() {
+    String sql = "create database test";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler("", 
sql));
+    assertEquals(501, result.get("code").getAsInt());
+    assertEquals("Database test already exists", 
result.get("message").getAsString());
+  }
+
+  public void errorNonQuery1() {
+    String sql =
+        "CREATE TABLE sg10(id1 string id, s1 int64 measurement, s2 float 
measurement, s3 string measurement)";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler(null, 
sql));
+    assertEquals(305, result.get("code").getAsInt());
+    assertEquals("database should not be null", 
result.get("message").getAsString());
+  }
+
+  public void errorNonQuery2() {
+    String sql = "create database test";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler(null, 
sql));
+    assertEquals(305, result.get("code").getAsInt());
+    assertEquals("database should not be null", 
result.get("message").getAsString());
+  }
+
+  public void errorNonQuery3() {
+    String sql = "select * from sg10";
+    JsonObject result = RestUtils.nonQuery(httpClient, port, 
sqlHandler("test", sql));
+    assertEquals(301, result.get("code").getAsInt());
+    assertEquals("EXECUTE_STATEMENT_ERROR", 
result.get("message").getAsString());
+  }
+
+  public String sqlHandler(String database, String sql) {
+    JsonObject json = new JsonObject();
+    json.addProperty("database", database);
+    json.addProperty("sql", sql);
+    return json.toString();
+  }
+
+  public void testInsertMultiPartition() {
+    List<String> sqls =
+        Arrays.asList(
+            "create table sg1 (id1 string id, s1 int32 measurement)",
+            "insert into sg1(id1,time,s1) values('d1',1,2)",
+            "flush",
+            "insert into sg1(id1,time,s1) values('d1',2,2)",
+            "insert into sg1(id1,time,s1) values('d1',604800001,2)",
+            "flush");
+    for (String sql : sqls) {
+      RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+    }
+  }
+
+  public void testInsertTablet() {
+    List<String> sqls =
+        Collections.singletonList(
+            "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT 
measurement)");
+    for (String sql : sqls) {
+      RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+    }
+    String json =
+        
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+    rightInsertTablet(json);
+  }
+
+  public void testInsertTabletNoDatabase() {
+    List<String> sqls =
+        Collections.singletonList(
+            "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT 
measurement)");
+    for (String sql : sqls) {
+      RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+    }
+    String json =
+        
"{\"database\":\"\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+    JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+    assertEquals(305, Integer.parseInt(result.get("code").toString()));
+  }
+
+  public void testInsertTablet1() {
+    List<String> sqls =
+        Collections.singletonList(
+            "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT 
measurement)");
+    for (String sql : sqls) {
+      RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+    }
+    String json =
+        
"{\"database\":\"test\",\"column_types\":[\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+    JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+    assertEquals(305, Integer.parseInt(result.get("code").toString()));
+    assertEquals(
+        "column_names and column_types should have the same size,column_types 
and data_types should have the same size",
+        result.get("message").getAsString());
+  }
+
+  public void testInsertTablet2() {
+    List<String> sqls =
+        Collections.singletonList(
+            "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT 
measurement)");
+    for (String sql : sqls) {
+      RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+    }
+    String json =
+        
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+    JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+    assertEquals(305, Integer.parseInt(result.get("code").toString()));
+    assertEquals(
+        "values and data_types should have the same size", 
result.get("message").getAsString());
+  }
+
+  public void rightInsertTablet(String json) {
+    JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+    assertEquals(200, Integer.parseInt(result.get("code").toString()));
+    JsonObject queryResult =
+        RestUtils.query(
+            httpClient, port, sqlHandler("test", "select id1,t1,s1 from sg211 
order by time"));
+    JsonArray jsonArray = queryResult.get("values").getAsJsonArray();
+    JsonArray jsonArray1 = jsonArray.get(0).getAsJsonArray();
+    assertEquals("a11", jsonArray1.get(0).getAsString());
+    assertEquals("false", jsonArray1.get(1).getAsString());
+    assertEquals(11f, jsonArray1.get(2).getAsFloat(), 0f);
+  }
+
+  public void prepareTableData() {
+    for (int i = 0; i < sqls.length; i++) {
+      JsonObject jsonObject = new JsonObject();
+      if (i > 0) {
+        jsonObject.addProperty("database", DATABASE);
+      } else {
+        jsonObject.addProperty("database", "");
+      }
+      jsonObject.addProperty("sql", sqls[i]);
+      RestUtils.nonQuery(httpClient, port, jsonObject.toString());
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceInsertAlignedValuesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceInsertAlignedValuesIT.java
new file mode 100644
index 00000000000..a3b8ca7a880
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceInsertAlignedValuesIT.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceInsertAlignedValuesIT {
+
+  private int port = 18080;
+  private CloseableHttpClient httpClient = null;
+
+  @Before
+  public void setUp() throws Exception {
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    baseEnv.initClusterEnvironment();
+    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    port = portConflictDataNodeWrapper.getRestServicePort();
+    httpClient = HttpClientBuilder.create().build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (httpClient != null) {
+        httpClient.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private static final String DATABASE = "test";
+
+  private static final String[] sqls = new String[] {"CREATE DATABASE t1"};
+
+  public void ping() {
+    HttpGet httpGet = new HttpGet("http://127.0.0.1:"; + port + "/ping");
+    CloseableHttpResponse response = null;
+    try {
+      for (int i = 0; i < 30; i++) {
+        try {
+          response = httpClient.execute(httpGet);
+          break;
+        } catch (Exception e) {
+          if (i == 29) {
+            throw e;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+      assertEquals(200, response.getStatusLine().getStatusCode());
+      assertEquals(200, Integer.parseInt(result.get("code").toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+  }
+
+  @Test
+  public void test() {
+    ping();
+    prepareTableData();
+    testInsertAlignedValues();
+    testUpdatingAlignedValues();
+    testInsertAlignedValuesWithSameTimestamp();
+    testInsertWithWrongMeasurementNum1();
+    testInsertWithWrongMeasurementNum2();
+    testInsertWithDuplicatedMeasurements();
+    testInsertMultiRows();
+    testInsertLargeNumber();
+    testExtendTextColumn();
+  }
+
+  public String sqlHandler(String database, String sql) {
+    JsonObject json = new JsonObject();
+    json.addProperty("database", database);
+    json.addProperty("sql", sql);
+    return json.toString();
+  }
+
+  public void testInsertAlignedValues() {
+    List<String> sqls =
+        Arrays.asList(
+            "create table wf01 (id1 string id, status boolean measurement, 
temperature float measurement)",
+            "insert into wf01(id1, time, status, temperature) values ('wt01', 
4000, true, 17.1)",
+            "insert into wf01(id1, time, status, temperature) values ('wt01', 
5000, true, 20.1)",
+            "insert into wf01(id1, time, status, temperature) values ('wt01', 
6000, true, 22)");
+    for (String sql : sqls) {
+      nonQuery(sqlHandler("t1", sql));
+    }
+    JsonObject jsonObject = query(sqlHandler("t1", "select time, status from 
wf01"));
+    JsonArray valuesList = jsonObject.getAsJsonArray("values");
+    for (int i = 0; i < valuesList.size(); i++) {
+      JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+      assertTrue(jsonArray.get(1).getAsBoolean());
+    }
+
+    jsonObject = query(sqlHandler("t1", "select time, status, temperature from 
wf01"));
+    valuesList = jsonObject.getAsJsonArray("values");
+    for (int i = 0; i < valuesList.size(); i++) {
+      JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+      if (i == 0) {
+        assertEquals(4000, jsonArray.get(0).getAsLong());
+        assertTrue(jsonArray.get(1).getAsBoolean());
+        assertEquals(17.1, jsonArray.get(2).getAsDouble(), 0.1);
+      } else if (i == 1) {
+        assertEquals(5000, jsonArray.get(0).getAsLong());
+        assertTrue(jsonArray.get(1).getAsBoolean());
+        assertEquals(20.1, jsonArray.get(2).getAsDouble(), 0.1);
+      } else if (i == 2) {
+        assertEquals(6000, jsonArray.get(0).getAsLong());
+        assertTrue(jsonArray.get(1).getAsBoolean());
+        assertEquals(22.0, jsonArray.get(2).getAsDouble(), 0.1);
+      }
+    }
+  }
+
+  public void testUpdatingAlignedValues() {
+    List<String> sqls =
+        Arrays.asList(
+            "create table wf03 (id1 string id, status boolean measurement, 
temperature float measurement)",
+            "insert into wf03(id1, time, status, temperature) values ('wt01', 
4000, true, 17.1)",
+            "insert into wf03(id1, time, status) values ('wt01', 5000, true)",
+            "insert into wf03(id1, time, temperature)values ('wt01', 5000, 
20.1)",
+            "insert into wf03(id1, time, temperature)values ('wt01', 6000, 
22)");
+    for (String sql : sqls) {
+      nonQuery(sqlHandler("t1", sql));
+    }
+
+    JsonObject jsonObject = query(sqlHandler("t1", "select time, status from 
wf03"));
+    JsonArray valuesList = jsonObject.getAsJsonArray("values");
+    for (int i = 0; i < valuesList.size(); i++) {
+      JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+      if (i >= 2) {
+        assertTrue(jsonArray.get(1).isJsonNull());
+      } else {
+        assertTrue(jsonArray.get(1).getAsBoolean());
+      }
+    }
+    jsonObject = query(sqlHandler("t1", "select time, status, temperature from 
wf03"));
+    valuesList = jsonObject.getAsJsonArray("values");
+    for (int i = 0; i < valuesList.size(); i++) {
+      JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+      if (i == 0) {
+        assertEquals(4000, jsonArray.get(0).getAsLong());
+        assertTrue(jsonArray.get(1).getAsBoolean());
+        assertEquals(17.1, jsonArray.get(2).getAsDouble(), 0.1);
+      } else if (i == 1) {
+        assertEquals(5000, jsonArray.get(0).getAsLong());
+        assertTrue(jsonArray.get(1).getAsBoolean());
+        assertEquals(20.1, jsonArray.get(2).getAsDouble(), 0.1);
+      } else if (i == 2) {
+        assertEquals(6000, jsonArray.get(0).getAsLong());
+        assertTrue(jsonArray.get(1).isJsonNull());
+        assertEquals(22.0f, jsonArray.get(2).getAsFloat(), 0.1);
+      }
+    }
+  }
+
+  public void testInsertAlignedValuesWithSameTimestamp() {
+    List<String> sqls =
+        Arrays.asList(
+            "create table sg3 (id1 string id, s2 double measurement, s1 double 
measurement)",
+            "insert into sg3(id1,time,s2) values('d1',1,2)",
+            "insert into sg3(id1,time,s1) values('d1',1,2)");
+    for (String sql : sqls) {
+      nonQuery(sqlHandler("t1", sql));
+    }
+
+    JsonObject jsonObject = query(sqlHandler("t1", "select time, s1, s2 from 
sg3"));
+    JsonArray valuesList = jsonObject.getAsJsonArray("values");
+    for (int i = 0; i < valuesList.size(); i++) {
+      JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+      for (int c = 0; c < jsonArray.size(); c++) {
+        assertEquals(1, jsonArray.get(0).getAsLong());
+        assertEquals(2.0d, jsonArray.get(1).getAsDouble(), 0.1);
+        assertEquals(2.0d, jsonArray.get(2).getAsDouble(), 0.1);
+      }
+    }
+  }
+
+  public void testInsertWithWrongMeasurementNum1() {
+    nonQuery(
+        sqlHandler(
+            "t1",
+            "create table wf04 (id1 string id, status int32, temperature int32 
measurement)"));
+    JsonObject jsonObject =
+        nonQuery(
+            sqlHandler(
+                "t1",
+                "insert into wf04(id1, time, status, temperature) 
values('wt01', 11000, 100)"));
+    assertEquals(
+        "701: Inconsistent numbers of non-time column names and values: 3-2",
+        jsonObject.get("code") + ": " + 
jsonObject.get("message").getAsString());
+  }
+
+  public void testInsertWithWrongMeasurementNum2() {
+    nonQuery(
+        sqlHandler(
+            "t1",
+            "create table wf04 (id1 string id, status int32, temperature int32 
measurement)"));
+    JsonObject jsonObject =
+        nonQuery(
+            sqlHandler(
+                "t1",
+                "insert into wf05(id1, time, status, temperature) 
values('wt01', 11000, 100, 300, 400)"));
+    assertEquals(
+        "701: Inconsistent numbers of non-time column names and values: 3-4",
+        jsonObject.get("code") + ": " + 
jsonObject.get("message").getAsString());
+  }
+
+  public void testInsertWithDuplicatedMeasurements() {
+    nonQuery(
+        sqlHandler("t1", "create table wf07(id1 string id, s3 boolean 
measurement, status int32)"));
+    JsonObject jsonObject =
+        nonQuery(
+            sqlHandler(
+                "t1",
+                "insert into wf07(id1, time, s3, status, status) 
values('wt01', 100, true, 20.1, 20.2)"));
+    assertEquals(
+        "701: Insertion contains duplicated measurement: status",
+        jsonObject.get("code") + ": " + 
jsonObject.get("message").getAsString());
+  }
+
+  public void testInsertMultiRows() {
+    nonQuery(
+        sqlHandler(
+            "t1", "create table sg8 (id1 string id, s1 int32 measurement, s2 
int32 measurement)"));
+    JsonObject jsonObject =
+        nonQuery(
+            sqlHandler(
+                "t1",
+                "insert into sg8(id1, time, s1, s2) values('d1', 10, 2, 2), 
('d1', 11, 3, '3'), ('d1', 12,12.11,false)"));
+    assertEquals(
+        "507: Fail to insert measurements [s1, s2] caused by [data type is not 
consistent, input 12.11, registered INT32, data type is not consistent, input 
false, registered INT32]",
+        jsonObject.get("code") + ": " + 
jsonObject.get("message").getAsString());
+  }
+
+  public void testInsertLargeNumber() {
+    nonQuery(
+        sqlHandler(
+            "t1",
+            "create table sg9 (id1 string id, s98 int64 measurement, s99 int64 
measurement)"));
+    JsonObject jsonObject =
+        nonQuery(
+            sqlHandler(
+                "t1",
+                "insert into sg9(id1, time, s98, s99) values('d1', 10, 2, 
271840880000000000000000)"));
+    assertEquals(
+        "700: line 1:58: Invalid numeric literal: 271840880000000000000000",
+        jsonObject.get("code") + ": " + 
jsonObject.get("message").getAsString());
+  }
+
+  public void testExtendTextColumn() {
+    List<String> sqls =
+        Arrays.asList(
+            "use t1",
+            "create table sg14 (id1 string id, s1 string measurement, s2 
string measurement)",
+            "insert into sg14(id1,time,s1,s2) values('d1',1,'test','test')",
+            "insert into sg14(id1,time,s1,s2) values('d1',3,'test','test')",
+            "insert into sg14(id1,time,s1,s2) values('d1',3,'test','test')",
+            "insert into sg14(id1,time,s1,s2) values('d1',4,'test','test')",
+            "insert into sg14(id1,time,s1,s3) values('d1',5,'test','test')",
+            "insert into sg14(id1,time,s1,s2) values('d1',6,'test','test')",
+            "flush",
+            "insert into sg14(id1,time,s1,s3) values('d1',7,'test','test')");
+    try {
+      for (String sql : sqls) {
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.addProperty("database", DATABASE);
+        jsonObject.addProperty("sql", sql);
+        nonQuery(jsonObject.toString());
+      }
+    } catch (Exception ignore) {
+
+    }
+  }
+
+  public void prepareTableData() {
+    for (int i = 0; i < sqls.length; i++) {
+      JsonObject jsonObject = new JsonObject();
+      if (i > 0) {
+        jsonObject.addProperty("database", DATABASE);
+      } else {
+        jsonObject.addProperty("database", "");
+      }
+      jsonObject.addProperty("sql", sqls[i]);
+      nonQuery(jsonObject.toString());
+    }
+  }
+
+  public JsonObject query(String json) {
+    return RestUtils.query(httpClient, port, json);
+  }
+
+  public JsonObject nonQuery(String json) {
+    return RestUtils.nonQuery(httpClient, port, json);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/RestUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/RestUtils.java
new file mode 100644
index 00000000000..cf70f9900f4
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/RestUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.rest.it;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import static org.junit.Assert.fail;
+
+public class RestUtils {
+
+  public static JsonObject insertTablet(CloseableHttpClient httpClient, int 
port, String json) {
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = getHttpPost("http://127.0.0.1:"; + port + 
"/rest/table/v1/insertTablet");
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      return JsonParser.parseString(message).getAsJsonObject();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+    return new JsonObject();
+  }
+
+  private static HttpPost getHttpPost(String url) {
+    HttpPost httpPost = new HttpPost(url);
+    httpPost.addHeader("Content-type", "application/json; charset=utf-8");
+    httpPost.setHeader("Accept", "application/json");
+    String authorization = getAuthorization("root", "root");
+    httpPost.setHeader("Authorization", authorization);
+    return httpPost;
+  }
+
+  private static String getAuthorization(String username, String password) {
+    return Base64.getEncoder()
+        .encodeToString((username + ":" + 
password).getBytes(StandardCharsets.UTF_8));
+  }
+
+  public static JsonObject query(CloseableHttpClient httpClient, int port, 
String json) {
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = getHttpPost("http://127.0.0.1:"; + port + 
"/rest/table/v1/query");
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      return JsonParser.parseString(message).getAsJsonObject();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+    return new JsonObject();
+  }
+
+  public static JsonObject nonQuery(CloseableHttpClient httpClient, int port, 
String json) {
+    CloseableHttpResponse response = null;
+    try {
+      HttpPost httpPost = getHttpPost("http://127.0.0.1:"; + port + 
"/rest/table/v1/nonQuery");
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      response = httpClient.execute(httpPost);
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      return JsonParser.parseString(message).getAsJsonObject();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+    return new JsonObject();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExceptionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExceptionHandler.java
new file mode 100644
index 00000000000..6bcf93bf8c7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExceptionHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.DatabaseNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.protocol.rest.model.ExecutionStatus;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.parser.ParsingException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response.Status;
+
+import java.io.IOException;
+
+public class ExceptionHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExceptionHandler.class);
+
+  private ExceptionHandler() {}
+
+  public static ExecutionStatus tryCatchException(Exception e) {
+    ExecutionStatus responseResult = new ExecutionStatus();
+    if (e instanceof QueryProcessException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(((QueryProcessException) e).getErrorCode());
+    } else if (e instanceof DatabaseNotSetException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(((DatabaseNotSetException) e).getErrorCode());
+    } else if (e instanceof StorageEngineException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(((StorageEngineException) e).getErrorCode());
+    } else if (e instanceof AuthException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(Status.BAD_REQUEST.getStatusCode());
+    } else if (e instanceof IllegalPathException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(((IllegalPathException) e).getErrorCode());
+    } else if (e instanceof MetadataException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(((MetadataException) e).getErrorCode());
+    } else if (e instanceof IoTDBException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(((IoTDBException) e).getErrorCode());
+    } else if (e instanceof ParseCancellationException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(TSStatusCode.SQL_PARSE_ERROR.getStatusCode());
+    } else if (e instanceof StatementAnalyzeException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(TSStatusCode.METADATA_ERROR.getStatusCode());
+    } else if (e instanceof SemanticException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(TSStatusCode.SEMANTIC_ERROR.getStatusCode());
+    } else if (!(e instanceof IOException) && !(e instanceof 
RuntimeException)) {
+      responseResult.setMessage(e.getMessage());
+      
responseResult.setCode(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    } else if (e instanceof ParsingException || e instanceof 
IllegalArgumentException) {
+      responseResult.setMessage(e.getMessage());
+      responseResult.setCode(TSStatusCode.SQL_PARSE_ERROR.getStatusCode());
+    } else {
+      responseResult.setMessage(e.getMessage());
+      
responseResult.setCode(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+    LOGGER.warn(e.getMessage(), e);
+    return responseResult;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExecuteStatementHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExecuteStatementHandler.java
new file mode 100644
index 00000000000..6f29c11f6c6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExecuteStatementHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowAINodes;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCluster;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowClusterId;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowConfigNodes;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentDatabase;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentSqlDialect;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentTimestamp;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ExecuteStatementHandler {
+  private ExecuteStatementHandler() {}
+
+  private static final Set<Class<? extends Statement>> 
INVALID_STATEMENT_CLASSES = new HashSet<>();
+
+  static {
+    INVALID_STATEMENT_CLASSES.add(CountDevice.class);
+    INVALID_STATEMENT_CLASSES.add(DescribeTable.class);
+    INVALID_STATEMENT_CLASSES.add(Explain.class);
+    INVALID_STATEMENT_CLASSES.add(ExplainAnalyze.class);
+    INVALID_STATEMENT_CLASSES.add(Query.class);
+    INVALID_STATEMENT_CLASSES.add(ShowAINodes.class);
+    INVALID_STATEMENT_CLASSES.add(ShowCluster.class);
+    INVALID_STATEMENT_CLASSES.add(ShowClusterId.class);
+    INVALID_STATEMENT_CLASSES.add(ShowConfigNodes.class);
+    INVALID_STATEMENT_CLASSES.add(ShowCurrentDatabase.class);
+    INVALID_STATEMENT_CLASSES.add(ShowCurrentSqlDialect.class);
+    INVALID_STATEMENT_CLASSES.add(ShowCurrentTimestamp.class);
+    INVALID_STATEMENT_CLASSES.add(ShowCurrentUser.class);
+    INVALID_STATEMENT_CLASSES.add(ShowDB.class);
+    INVALID_STATEMENT_CLASSES.add(ShowDataNodes.class);
+    INVALID_STATEMENT_CLASSES.add(ShowDevice.class);
+    INVALID_STATEMENT_CLASSES.add(ShowFunctions.class);
+    INVALID_STATEMENT_CLASSES.add(ShowIndex.class);
+    INVALID_STATEMENT_CLASSES.add(ShowPipePlugins.class);
+    INVALID_STATEMENT_CLASSES.add(ShowPipes.class);
+    INVALID_STATEMENT_CLASSES.add(ShowRegions.class);
+    INVALID_STATEMENT_CLASSES.add(ShowTables.class);
+    INVALID_STATEMENT_CLASSES.add(ShowVariables.class);
+    INVALID_STATEMENT_CLASSES.add(ShowVersion.class);
+  }
+
+  public static boolean validateStatement(Statement statement) {
+    return !INVALID_STATEMENT_CLASSES.contains(statement.getClass());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/QueryDataSetHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/QueryDataSetHandler.java
new file mode 100644
index 00000000000..149b99fc028
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/QueryDataSetHandler.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.protocol.rest.model.ExecutionStatus;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.QueryDataSet;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class QueryDataSetHandler {
+
+  private QueryDataSetHandler() {}
+
+  /**
+   * @param actualRowSizeLimit max number of rows to return. no limit when 
actualRowSizeLimit <= 0.
+   */
+  public static Response fillQueryDataSet(
+      IQueryExecution queryExecution, Statement statement, int 
actualRowSizeLimit)
+      throws IoTDBException {
+    if (statement instanceof Query) {
+      return fillQueryDataSet(queryExecution, actualRowSizeLimit);
+    } else {
+      return fillOtherDataSet(queryExecution, actualRowSizeLimit);
+    }
+  }
+
+  public static Response fillQueryDataSet(
+      IQueryExecution queryExecution, final int actualRowSizeLimit) throws 
IoTDBException {
+    QueryDataSet targetDataSet = new QueryDataSet();
+    int fetched = 0;
+    int columnNum = queryExecution.getOutputValueColumnCount();
+
+    DatasetHeader header = queryExecution.getDatasetHeader();
+    List<String> resultColumns = header.getRespColumns();
+    List<TSDataType> dataTypes = header.getRespDataTypes();
+    Map<String, Integer> headerMap = header.getColumnNameIndexMap();
+    for (int i = 0; i < resultColumns.size(); i++) {
+      targetDataSet.addColumnNamesItem(resultColumns.get(i));
+      targetDataSet.addDataTypesItem(dataTypes.get(i).name());
+      targetDataSet.addValuesItem(new ArrayList<>());
+    }
+    while (true) {
+      if (0 < actualRowSizeLimit && actualRowSizeLimit <= fetched) {
+        return Response.ok()
+            .entity(
+                new ExecutionStatus()
+                    .code(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode())
+                    .message(
+                        String.format(
+                            "Dataset row size exceeded the given max row size 
(%d)",
+                            actualRowSizeLimit)))
+            .build();
+      }
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) {
+        if (fetched == 0) {
+          targetDataSet.setValues(new ArrayList<>());
+          return Response.ok().entity(targetDataSet).build();
+        }
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+      int currentCount = tsBlock.getPositionCount();
+
+      for (int k = 0; k < resultColumns.size(); k++) {
+        Column column = tsBlock.getColumn(headerMap.get(resultColumns.get(k)));
+        List<Object> targetDataSetColumn = targetDataSet.getValues().get(k);
+        for (int i = 0; i < currentCount; i++) {
+          fetched++;
+          if (column.isNull(i)) {
+            targetDataSetColumn.add(null);
+          } else {
+            targetDataSetColumn.add(
+                column.getDataType().equals(TSDataType.TEXT)
+                    ? 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)
+                    : column.getObject(i));
+          }
+        }
+        if (k != columnNum - 1) {
+          fetched -= currentCount;
+        }
+      }
+    }
+    targetDataSet.setValues(convertColumnToRow(targetDataSet.getValues()));
+    return Response.ok().entity(targetDataSet).build();
+  }
+
+  private static Response fillOtherDataSet(
+      IQueryExecution queryExecution, final int actualRowSizeLimit) throws 
IoTDBException {
+    QueryDataSet targetDataSet = new QueryDataSet();
+    int[] targetDataSetIndexToSourceDataSetIndex =
+        new int[queryExecution.getDatasetHeader().getRespColumns().size()];
+    initTargetDatasetOrderByOrderWithSourceDataSet(
+        queryExecution.getDatasetHeader(), 
targetDataSetIndexToSourceDataSetIndex, targetDataSet);
+
+    return fillOtherQueryDataSet(
+        queryExecution, targetDataSetIndexToSourceDataSetIndex, 
actualRowSizeLimit, targetDataSet);
+  }
+
+  private static void initTargetDatasetOrderByOrderWithSourceDataSet(
+      DatasetHeader datasetHeader,
+      int[] targetDataSetIndexToSourceDataSetIndex,
+      QueryDataSet targetDataSet) {
+    if (datasetHeader.getRespColumns() != null) {
+      for (int i = 0; i < datasetHeader.getRespColumns().size(); i++) {
+        
targetDataSet.addColumnNamesItem(datasetHeader.getRespColumns().get(i));
+        targetDataSet.addValuesItem(new ArrayList<>());
+        
targetDataSet.addDataTypesItem(datasetHeader.getRespDataTypes().get(i).name());
+        targetDataSetIndexToSourceDataSetIndex[i] = i;
+      }
+    }
+  }
+
+  private static Response fillOtherQueryDataSet(
+      IQueryExecution queryExecution,
+      int[] targetDataSetIndexToSourceDataSetIndex,
+      int actualRowSizeLimit,
+      QueryDataSet targetDataSet)
+      throws IoTDBException {
+    int fetched = 0;
+    int columnNum = queryExecution.getOutputValueColumnCount();
+    while (true) {
+      if (0 < actualRowSizeLimit && actualRowSizeLimit <= fetched) {
+        return Response.ok()
+            .entity(
+                new ExecutionStatus()
+                    .code(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode())
+                    .message(
+                        String.format(
+                            "Dataset row size exceeded the given max row size 
(%d)",
+                            actualRowSizeLimit)))
+            .build();
+      }
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        if (fetched == 0) {
+          targetDataSet.setValues(new ArrayList<>());
+          return Response.ok().entity(targetDataSet).build();
+        }
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+      int currentCount = tsBlock.getPositionCount();
+      if (currentCount == 0) {
+        targetDataSet.setValues(new ArrayList<>());
+        return Response.ok().entity(targetDataSet).build();
+      }
+      for (int k = 0; k < columnNum; k++) {
+        Column column = 
tsBlock.getColumn(targetDataSetIndexToSourceDataSetIndex[k]);
+        List<Object> targetDataSetColumn = targetDataSet.getValues().get(k);
+        for (int i = 0; i < currentCount; i++) {
+          fetched++;
+          if (column.isNull(i)) {
+            targetDataSetColumn.add(null);
+          } else {
+            targetDataSetColumn.add(
+                column.getDataType().equals(TSDataType.TEXT)
+                    ? 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)
+                    : column.getObject(i));
+          }
+        }
+        if (k != columnNum - 1) {
+          fetched -= currentCount;
+        }
+      }
+    }
+    targetDataSet.setValues(convertColumnToRow(targetDataSet.getValues()));
+    return Response.ok().entity(targetDataSet).build();
+  }
+
+  private static List<List<Object>> convertColumnToRow(List<List<Object>> 
values) {
+    List<List<Object>> result = new ArrayList<>();
+
+    if (values.isEmpty() || values.get(0).isEmpty()) {
+      return result;
+    }
+
+    int numRows = values.get(0).size();
+    for (int i = 0; i < numRows; i++) {
+      List<Object> newRow = new ArrayList<>();
+      for (List<Object> value : values) {
+        newRow.add(value.get(i));
+      }
+      result.add(newRow);
+    }
+    return result;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/RequestValidationHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/RequestValidationHandler.java
new file mode 100644
index 00000000000..81624ba9031
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/RequestValidationHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.SQL;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RequestValidationHandler {
+
+  private RequestValidationHandler() {}
+
+  public static void validateQuerySQL(SQL sql) {
+    validateSQL(sql);
+    if (sql.getDatabase().isEmpty()) {
+      throw new IllegalArgumentException("database should not be an empty 
string");
+    }
+  }
+
+  public static void validateSQL(SQL sql) {
+    Objects.requireNonNull(sql.getSql(), "sql should not be null");
+    Objects.requireNonNull(sql.getDatabase(), "database should not be null");
+    if (sql.getRowLimit() != null) {
+      Validate.isTrue(sql.getRowLimit() > 0, "row_limit should be positive");
+    }
+  }
+
+  public static void validateInsertTabletRequest(InsertTabletRequest 
insertTabletRequest) {
+    Objects.requireNonNull(insertTabletRequest.getDatabase(), "database should 
not be null");
+    Objects.requireNonNull(insertTabletRequest.getTable(), "table should not 
be null");
+    Objects.requireNonNull(insertTabletRequest.getColumnNames(), "column_names 
should not be null");
+    Objects.requireNonNull(insertTabletRequest.getColumnTypes(), "column_types 
should not be null");
+    Objects.requireNonNull(insertTabletRequest.getDataTypes(), "data_types 
should not be null");
+    Objects.requireNonNull(insertTabletRequest.getTimestamps(), "timestamps 
should not be null");
+    Objects.requireNonNull(insertTabletRequest.getValues(), "values should not 
be null");
+    List<String> errorMessages = new ArrayList<>();
+    String table = insertTabletRequest.getTable();
+    if (insertTabletRequest.getColumnTypes().size() == 0
+        || insertTabletRequest.getColumnTypes().size()
+            != insertTabletRequest.getColumnNames().size()) {
+      errorMessages.add("column_names and column_types should have the same 
size");
+    }
+    if (insertTabletRequest.getColumnTypes().size() != 
insertTabletRequest.getDataTypes().size()) {
+      errorMessages.add("column_types and data_types should have the same 
size");
+    }
+    if (insertTabletRequest.getTimestamps().size() != 
insertTabletRequest.getValues().size()) {
+      errorMessages.add("values and data_types should have the same size");
+    }
+
+    for (int i = 0; i < insertTabletRequest.getDataTypes().size(); i++) {
+      String dataType = insertTabletRequest.getDataTypes().get(i);
+      if (isDataType(dataType)) {
+        errorMessages.add("The " + dataType + " data type of " + table + " is 
illegal");
+      }
+    }
+
+    int dataTypeSize = insertTabletRequest.getDataTypes().size();
+    for (int i = 0; i < insertTabletRequest.getValues().size(); i++) {
+      List<Object> values = insertTabletRequest.getValues().get(i);
+      if (dataTypeSize != values.size()) {
+        errorMessages.add(
+            "The number of values in the " + i + "th row is not equal to the 
data_types size");
+      }
+    }
+
+    if (!errorMessages.isEmpty()) {
+      throw new RuntimeException(String.join(",", errorMessages));
+    }
+  }
+
+  private static boolean isDataType(String dataType) {
+    try {
+      TSDataType.valueOf(dataType.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
new file mode 100644
index 00000000000..a9b770915c5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+
+public class StatementConstructionHandler {
+
+  private static final DataNodeDevicePathCache DEVICE_PATH_CACHE =
+      DataNodeDevicePathCache.getInstance();
+
+  private StatementConstructionHandler() {}
+
+  public static InsertTabletStatement constructInsertTabletStatement(
+      InsertTabletRequest insertTabletReq)
+      throws IllegalPathException, WriteProcessRejectException {
+    InsertTabletStatement insertStatement = new InsertTabletStatement();
+    
insertStatement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(insertTabletReq.getTable()));
+    
insertStatement.setMeasurements(insertTabletReq.getColumnNames().toArray(new 
String[0]));
+    long[] timestamps =
+        
insertTabletReq.getTimestamps().stream().mapToLong(Long::longValue).toArray();
+    if (timestamps.length != 0) {
+      
TimestampPrecisionUtils.checkTimestampPrecision(timestamps[timestamps.length - 
1]);
+    }
+    insertStatement.setTimes(timestamps);
+    int columnSize = insertTabletReq.getColumnNames().size();
+    int rowSize = insertTabletReq.getTimestamps().size();
+    List<List<Object>> rawData = insertTabletReq.getValues();
+    Object[] columns = new Object[columnSize];
+    BitMap[] bitMaps = new BitMap[columnSize];
+    List<String> rawDataType = insertTabletReq.getDataTypes();
+    TSDataType[] dataTypes = new TSDataType[columnSize];
+
+    for (int i = 0; i < columnSize; i++) {
+      dataTypes[i] = 
TSDataType.valueOf(rawDataType.get(i).toUpperCase(Locale.ROOT));
+    }
+
+    for (int columnIndex = 0; columnIndex < columnSize; columnIndex++) {
+      bitMaps[columnIndex] = new BitMap(rowSize);
+      switch (dataTypes[columnIndex]) {
+        case BOOLEAN:
+          boolean[] booleanValues = new boolean[rowSize];
+          for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+            Object data = rawData.get(rowIndex).get(columnIndex);
+            if (data == null) {
+              bitMaps[columnIndex].mark(rowIndex);
+            } else {
+              if ("1".equals(data.toString())) {
+                booleanValues[rowIndex] = true;
+              } else if ("0".equals(data.toString())) {
+                booleanValues[rowIndex] = false;
+              } else {
+                booleanValues[rowIndex] = (Boolean) data;
+              }
+            }
+          }
+          columns[columnIndex] = booleanValues;
+          break;
+        case INT32:
+        case DATE:
+          int[] intValues = new int[rowSize];
+          for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+            Object object = rawData.get(rowIndex).get(columnIndex);
+            if (object == null) {
+              bitMaps[columnIndex].mark(rowIndex);
+            } else if (object instanceof Integer) {
+              intValues[rowIndex] = (int) object;
+            } else {
+              throw new WriteProcessRejectException(
+                  "unsupported data type: " + object.getClass().toString());
+            }
+          }
+          columns[columnIndex] = intValues;
+          break;
+        case INT64:
+        case TIMESTAMP:
+          long[] longValues = new long[rowSize];
+          for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+            Object object = rawData.get(rowIndex).get(columnIndex);
+            if (object == null) {
+              bitMaps[columnIndex].mark(rowIndex);
+            } else if (object instanceof Integer) {
+              longValues[rowIndex] = (int) object;
+            } else if (object instanceof Long) {
+              longValues[rowIndex] = (long) object;
+            } else {
+              throw new WriteProcessRejectException(
+                  "unsupported data type: " + object.getClass().toString());
+            }
+          }
+          columns[columnIndex] = longValues;
+          break;
+        case FLOAT:
+          float[] floatValues = new float[rowSize];
+          for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+            Object data = rawData.get(rowIndex).get(columnIndex);
+            if (data == null) {
+              bitMaps[columnIndex].mark(rowIndex);
+            } else {
+              floatValues[rowIndex] = Float.parseFloat(String.valueOf(data));
+            }
+          }
+          columns[columnIndex] = floatValues;
+          break;
+        case DOUBLE:
+          double[] doubleValues = new double[rowSize];
+          for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+            if (rawData.get(rowIndex).get(columnIndex) == null) {
+              bitMaps[columnIndex].mark(rowIndex);
+            } else {
+              doubleValues[rowIndex] =
+                  
Double.parseDouble(String.valueOf(rawData.get(rowIndex).get(columnIndex)));
+            }
+          }
+          columns[columnIndex] = doubleValues;
+          break;
+        case TEXT:
+        case BLOB:
+        case STRING:
+          Binary[] binaryValues = new Binary[rowSize];
+          for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+            if (rawData.get(rowIndex).get(columnIndex) == null) {
+              bitMaps[columnIndex].mark(rowIndex);
+              binaryValues[rowIndex] = new 
Binary("".getBytes(StandardCharsets.UTF_8));
+            } else {
+              binaryValues[rowIndex] =
+                  new Binary(
+                      rawData
+                          .get(rowIndex)
+                          .get(columnIndex)
+                          .toString()
+                          .getBytes(StandardCharsets.UTF_8));
+            }
+          }
+          columns[columnIndex] = binaryValues;
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid input: " + 
rawDataType.get(columnIndex));
+      }
+    }
+    insertStatement.setColumns(columns);
+    insertStatement.setBitMaps(bitMaps);
+    insertStatement.setRowCount(rowSize);
+    insertStatement.setDataTypes(dataTypes);
+    insertStatement.setAligned(false);
+    insertStatement.setWriteToTable(true);
+    TsTableColumnCategory[] columnCategories =
+        new TsTableColumnCategory[insertTabletReq.getColumnTypes().size()];
+    for (int i = 0; i < columnCategories.length; i++) {
+      columnCategories[i] =
+          TsTableColumnCategory.fromTsFileColumnType(
+              
Tablet.ColumnCategory.valueOf(insertTabletReq.getColumnTypes().get(i)));
+    }
+    insertStatement.setColumnCategories(columnCategories);
+
+    return insertStatement;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java
new file mode 100644
index 00000000000..f881ef653c4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
+import org.apache.iotdb.db.protocol.rest.table.v1.NotFoundException;
+import org.apache.iotdb.db.protocol.rest.table.v1.RestApiService;
+import org.apache.iotdb.db.protocol.rest.table.v1.handler.ExceptionHandler;
+import 
org.apache.iotdb.db.protocol.rest.table.v1.handler.ExecuteStatementHandler;
+import org.apache.iotdb.db.protocol.rest.table.v1.handler.QueryDataSetHandler;
+import 
org.apache.iotdb.db.protocol.rest.table.v1.handler.RequestValidationHandler;
+import 
org.apache.iotdb.db.protocol.rest.table.v1.handler.StatementConstructionHandler;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.ExecutionStatus;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.SQL;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+
+public class RestApiServiceImpl extends RestApiService {
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final Integer defaultQueryRowLimit;
+
+  public RestApiServiceImpl() {
+    defaultQueryRowLimit =
+        
IoTDBRestServiceDescriptor.getInstance().getConfig().getRestQueryDefaultRowSizeLimit();
+  }
+
+  public Response executeQueryStatement(SQL sql, SecurityContext 
securityContext)
+      throws NotFoundException {
+    SqlParser relationSqlParser = new SqlParser();
+    Long queryId = null;
+    Statement statement = null;
+    long startTime = System.nanoTime();
+    try {
+      RequestValidationHandler.validateQuerySQL(sql);
+      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+      clientSession.setDatabaseName(sql.getDatabase());
+      clientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
+      statement =
+          relationSqlParser.createStatement(sql.getSql(), 
ZoneId.systemDefault(), clientSession);
+      if (statement == null) {
+        return Response.ok()
+            .entity(
+                new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+                    .code(TSStatusCode.SQL_PARSE_ERROR.getStatusCode())
+                    .message("This operation type is not supported"))
+            .build();
+      }
+
+      if (ExecuteStatementHandler.validateStatement(statement)) {
+        return Response.ok()
+            .entity(
+                new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+                    .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+                    .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
+            .build();
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId();
+      Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              statement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              sql.getSql(),
+              metadata,
+              config.getQueryTimeoutThreshold(),
+              true);
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return Response.ok()
+            .entity(
+                new ExecutionStatus()
+                    .code(result.status.getCode())
+                    .message(result.status.getMessage()))
+            .build();
+      }
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        return QueryDataSetHandler.fillQueryDataSet(
+            queryExecution,
+            statement,
+            sql.getRowLimit() == null ? defaultQueryRowLimit : 
sql.getRowLimit());
+      }
+    } catch (Exception e) {
+      return 
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+    } finally {
+      long costTime = System.nanoTime() - startTime;
+      Optional.ofNullable(statement)
+          .ifPresent(
+              s ->
+                  CommonUtils.addStatementExecutionLatency(
+                      OperationType.EXECUTE_QUERY_STATEMENT, s.toString(), 
costTime));
+      if (queryId != null) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
+    }
+  }
+
+  @Override
+  public Response insertTablet(
+      InsertTabletRequest insertTabletRequest, SecurityContext securityContext)
+      throws NotFoundException {
+    Long queryId = null;
+    long startTime = System.nanoTime();
+    InsertTabletStatement insertTabletStatement = null;
+    try {
+      
RequestValidationHandler.validateInsertTabletRequest(insertTabletRequest);
+      insertTabletStatement =
+          
StatementConstructionHandler.constructInsertTabletStatement(insertTabletRequest);
+      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+      clientSession.setDatabaseName(insertTabletRequest.getDatabase());
+      clientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
+      queryId = SESSION_MANAGER.requestQueryId();
+      Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+
+      SqlParser relationSqlParser = new SqlParser();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              insertTabletStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              "",
+              metadata,
+              config.getQueryTimeoutThreshold());
+
+      return responseGenerateHelper(result);
+    } catch (Exception e) {
+      return 
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+    } finally {
+      long costTime = System.nanoTime() - startTime;
+      Optional.ofNullable(insertTabletStatement)
+          .ifPresent(
+              s ->
+                  CommonUtils.addStatementExecutionLatency(
+                      OperationType.INSERT_TABLET, s.getType().name(), 
costTime));
+      if (queryId != null) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
+    }
+  }
+
+  @Override
+  public Response executeNonQueryStatement(SQL sql, SecurityContext 
securityContext)
+      throws NotFoundException {
+    SqlParser relationSqlParser = new SqlParser();
+    Long queryId = null;
+    Statement statement = null;
+    long startTime = System.nanoTime();
+    try {
+      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+      RequestValidationHandler.validateSQL(sql);
+      clientSession.setDatabaseName(sql.getDatabase());
+      clientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
+      statement =
+          relationSqlParser.createStatement(sql.getSql(), 
ZoneId.systemDefault(), clientSession);
+
+      if (statement == null) {
+        return Response.ok()
+            .entity(
+                new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+                    .code(TSStatusCode.SQL_PARSE_ERROR.getStatusCode())
+                    .message("This operation type is not supported"))
+            .build();
+      }
+      if (!ExecuteStatementHandler.validateStatement(statement)) {
+        return Response.ok()
+            .entity(
+                new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+                    .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+                    .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
+            .build();
+      }
+      queryId = SESSION_MANAGER.requestQueryId();
+      Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              statement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              sql.getSql(),
+              metadata,
+              config.getQueryTimeoutThreshold(),
+              false);
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        return Response.ok()
+            .entity(
+                new ExecutionStatus()
+                    .code(result.status.getCode())
+                    .message(result.status.getMessage()))
+            .build();
+      }
+      return responseGenerateHelper(result);
+    } catch (Exception e) {
+      return 
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+    } finally {
+      long costTime = System.nanoTime() - startTime;
+      Optional.ofNullable(statement)
+          .ifPresent(
+              s ->
+                  CommonUtils.addStatementExecutionLatency(
+                      OperationType.EXECUTE_NON_QUERY_PLAN, s.toString(), 
costTime));
+      if (queryId != null) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
+    }
+  }
+
+  private Response responseGenerateHelper(ExecutionResult result) {
+    if (result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || result.status.code == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      return Response.ok()
+          .entity(
+              new ExecutionStatus()
+                  .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+                  .message(TSStatusCode.SUCCESS_STATUS.name()))
+          .build();
+    } else if (result.status.code == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      List<TSStatus> subStatus = result.status.getSubStatus();
+      StringBuilder errMsg = new StringBuilder();
+      for (TSStatus status : subStatus) {
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+          errMsg.append(status.getMessage()).append("; ");
+        }
+      }
+      return Response.ok()
+          .entity(
+              new ExecutionStatus()
+                  .code(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+                  .message(errMsg.toString()))
+          .build();
+    } else {
+      return Response.ok()
+          .entity(
+              new ExecutionStatus()
+                  .code(result.status.getCode())
+                  .message(result.status.getMessage()))
+          .build();
+    }
+  }
+}
diff --git a/iotdb-protocol/openapi/pom.xml b/iotdb-protocol/openapi/pom.xml
index 9b42d36494a..c82eebcc0cf 100644
--- a/iotdb-protocol/openapi/pom.xml
+++ b/iotdb-protocol/openapi/pom.xml
@@ -163,6 +163,33 @@
                             </configOptions>
                         </configuration>
                     </execution>
+                    <execution>
+                        <id>generate-java-rest-codes-table-v1</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            
<inputSpec>${project.basedir}/src/main/openapi3/iotdb_rest_table_v1.yaml</inputSpec>
+                            
<output>${project.build.directory}/generated-sources/java</output>
+                            
<apiPackage>org.apache.iotdb.db.protocol.rest.table.v1</apiPackage>
+                            
<modelPackage>org.apache.iotdb.db.protocol.rest.table.v1.model</modelPackage>
+                            
<invokerPackage>org.apache.iotdb.db.protocol.rest.table.v1.invoker</invokerPackage>
+                            <generatorName>jaxrs-jersey</generatorName>
+                            <groupId>org.apache.iotdb</groupId>
+                            <artifactId>iotdb-rest-service</artifactId>
+                            
<artifactVersion>${project.version}</artifactVersion>
+                            <addCompileSourceRoot>true</addCompileSourceRoot>
+                            <configOptions>
+                                <licenseName>Apache License 2.0</licenseName>
+                                <groupId>org.apache.iotdb</groupId>
+                                <artifactId>iotdb-rest-service</artifactId>
+                                
<artifactVersion>${project.version}</artifactVersion>
+                                <dateLibrary>java8</dateLibrary>
+                                <useGzipFeature>true</useGzipFeature>
+                            </configOptions>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
             <plugin>
diff --git a/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_table_v1.yaml 
b/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_table_v1.yaml
new file mode 100644
index 00000000000..a02784d4cbf
--- /dev/null
+++ b/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_table_v1.yaml
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+openapi: 3.0.0
+info:
+  title: iotdb_rest_table_v1
+  description: IoTDB Rest API..
+  license:
+    name: Apache 2.0
+    url: https://www.apache.org/licenses/LICENSE-2.0.html
+  version: 1.0.0
+servers:
+  - url: http://127.0.0.1:18080/
+    description: api
+security:
+  - basic: []
+paths:
+  /rest/table/v1/insertTablet:
+    post:
+      summary: insertTablet
+      description: insertTablet
+      operationId: insertTablet
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/InsertTabletRequest'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+  /rest/table/v1/nonQuery:
+    post:
+      summary: executeNonQueryStatement
+      description: executeNonQueryStatement
+      operationId: executeNonQueryStatement
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/SQL'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+  /rest/table/v1/query:
+    post:
+      summary: executeQueryStatement
+      description: executeQueryStatement
+      operationId: executeQueryStatement
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/SQL'
+      responses:
+        "200":
+          description: QueryDataSet
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/QueryDataSet'
+
+components:
+  schemas:
+    SQL:
+      title: SQL
+      type: object
+      properties:
+        database:
+          type: string
+        sql:
+          type: string
+        row_limit:
+          type: integer
+          format: int32
+
+    InsertTabletRequest:
+      title: InsertTabletRequest
+      type: object
+      properties:
+        timestamps:
+          type: array
+          items:
+            type: integer
+            format: int64
+        column_names:
+          type: array
+          items:
+            type: string
+        column_types:
+          type: array
+          items:
+            type: string
+        data_types:
+          type: array
+          items:
+            type: string
+        values:
+          type: array
+          items:
+            type: array
+            items:
+              type: object
+        table:
+          type: string
+        database:
+          type: string
+
+    ExecutionStatus:
+      type: object
+      properties:
+        code:
+          type: integer
+          format: int32
+        message:
+          type: string
+
+    QueryDataSet:
+      type: object
+      properties:
+        column_names:
+          type: array
+          items:
+            type: string
+        data_types:
+          type: array
+          items:
+            type: string
+        values:
+          type: array
+          items:
+            type: array
+            items:
+              type: object
+  securitySchemes:
+    basic:
+      type: http
+      scheme: basic
+    APIKey:
+      type: apiKey
+      name: API Key
+      in: header


Reply via email to