This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c059bcc2068 Feature add support for query, nonQuery, and insertTablet
interfaces for table models in the REST service (#14165)
c059bcc2068 is described below
commit c059bcc2068403c8faac96885ba2775804076511
Author: CloudWise-Lukemiao <[email protected]>
AuthorDate: Mon Dec 23 17:38:53 2024 +0800
Feature add support for query, nonQuery, and insertTablet interfaces for
table models in the REST service (#14165)
* Feature add support for query, nonQuery, and insertTablet interfaces for
table models in the REST service.
* Feature add support for query, nonQuery, and insertTablet interfaces for
table models in the REST service.
* Feature add support for query, nonQuery, and insertTablet interfaces for
table models in the REST service.
* Remove useless code
* Remove useless code
* Fix IT failure issue
* Fix IT failure issue
* Fix IT failure issue
* Fix IT failure issue
* Fix IT failure issue
* add example
---
.../java/org/apache/iotdb/TableHttpExample.java | 223 ++++++++++
.../java/org/apache/iotdb/TableHttpsExample.java | 223 ++++++++++
.../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java | 492 +++++++++++++++++++++
.../it/rest/it/IoTDBRestServiceFlushQueryIT.java | 311 +++++++++++++
.../relational/it/rest/it/IoTDBRestServiceIT.java | 350 +++++++++++++++
.../it/IoTDBRestServiceInsertAlignedValuesIT.java | 366 +++++++++++++++
.../iotdb/relational/it/rest/it/RestUtils.java | 128 ++++++
.../rest/table/v1/handler/ExceptionHandler.java | 91 ++++
.../table/v1/handler/ExecuteStatementHandler.java | 84 ++++
.../rest/table/v1/handler/QueryDataSetHandler.java | 219 +++++++++
.../table/v1/handler/RequestValidationHandler.java | 100 +++++
.../v1/handler/StatementConstructionHandler.java | 188 ++++++++
.../rest/table/v1/impl/RestApiServiceImpl.java | 293 ++++++++++++
iotdb-protocol/openapi/pom.xml | 27 ++
.../src/main/openapi3/iotdb_rest_table_v1.yaml | 167 +++++++
15 files changed, 3262 insertions(+)
diff --git
a/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpExample.java
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpExample.java
new file mode 100644
index 00000000000..fb223ae7b91
--- /dev/null
+++
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpExample.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class TableHttpExample {
+
+ private static final String UTF8 = "utf-8";
+
+ private String getAuthorization(String username, String password) {
+ return Base64.getEncoder()
+ .encodeToString((username + ":" +
password).getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static void main(String[] args) {
+ TableHttpExample httpExample = new TableHttpExample();
+ httpExample.ping();
+ httpExample.createDatabase();
+ httpExample.createTable();
+ httpExample.nonQuery();
+ httpExample.insertTablet();
+ httpExample.query();
+ }
+
+ public void ping() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ HttpGet httpGet = new HttpGet("http://127.0.0.1:18080/ping");
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(httpGet);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+ String result =
JsonParser.parseString(message).getAsJsonObject().toString();
+ System.out.println(result);
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The ping rest api failed");
+ } finally {
+ try {
+ httpClient.close();
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Http Client close error");
+ }
+ }
+ }
+
+ private HttpPost getHttpPost(String url) {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.addHeader("Content-type", "application/json; charset=utf-8");
+ httpPost.setHeader("Accept", "application/json");
+ String authorization = getAuthorization("root", "root");
+ httpPost.setHeader("Authorization", authorization);
+ return httpPost;
+ }
+
+ public void insertTablet() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("http://127.0.0.1:18080/rest/table/v1/insertTablet");
+ String json =
+
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11333],[\"a11\",\"false\",22333],[\"a13\",\"false1\",23333],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+ String result =
JsonParser.parseString(message).getAsJsonObject().toString();
+ System.out.println(result);
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The insertTablet rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void nonQuery() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("http://127.0.0.1:18080/rest/table/v1/nonQuery");
+ String sql =
+ "{\"database\":\"test\",\"sql\":\"INSERT INTO sg211(time, id1, s1)
values(100, 'd1', 0)\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The non query rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void createDatabase() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("http://127.0.0.1:18080/rest/table/v1/nonQuery");
+ String sql = "{\"database\":\"\",\"sql\":\"create database if not exists
test\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The non query rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void createTable() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("http://127.0.0.1:18080/rest/table/v1/nonQuery");
+ String sql =
+ "{\"database\":\"test\",\"sql\":\"create table sg211 (id1 string
id,t1 STRING ATTRIBUTE, s1 FLOAT measurement)\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("create table failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void query() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("http://127.0.0.1:18080/rest/table/v1/query");
+ String sql = "{\"database\":\"test\",\"sql\":\"select * from sg211\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The query rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+}
diff --git
a/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpsExample.java
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpsExample.java
new file mode 100644
index 00000000000..aa5e4afd76d
--- /dev/null
+++
b/example/rest-java-example/src/main/java/org/apache/iotdb/TableHttpsExample.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class TableHttpsExample {
+
+ private static final String UTF8 = "utf-8";
+
+ private String getAuthorization(String username, String password) {
+ return Base64.getEncoder()
+ .encodeToString((username + ":" +
password).getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static void main(String[] args) {
+ TableHttpsExample httpExample = new TableHttpsExample();
+ httpExample.ping();
+ httpExample.createDatabase();
+ httpExample.createTable();
+ httpExample.nonQuery();
+ httpExample.insertTablet();
+ httpExample.query();
+ }
+
+ public void ping() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ HttpGet httpGet = new HttpGet("https://127.0.0.1:18080/ping");
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(httpGet);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+ String result =
JsonParser.parseString(message).getAsJsonObject().toString();
+ System.out.println(result);
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The ping rest api failed");
+ } finally {
+ try {
+ httpClient.close();
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Http Client close error");
+ }
+ }
+ }
+
+ private HttpPost getHttpPost(String url) {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.addHeader("Content-type", "application/json; charset=utf-8");
+ httpPost.setHeader("Accept", "application/json");
+ String authorization = getAuthorization("root", "root");
+ httpPost.setHeader("Authorization", authorization);
+ return httpPost;
+ }
+
+ public void insertTablet() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("https://127.0.0.1:18080/rest/table/v1/insertTablet");
+ String json =
+
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11333],[\"a11\",\"false\",22333],[\"a13\",\"false1\",23333],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+ String result =
JsonParser.parseString(message).getAsJsonObject().toString();
+ System.out.println(result);
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The insertTablet rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void nonQuery() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("https://127.0.0.1:18080/rest/table/v1/nonQuery");
+ String sql =
+ "{\"database\":\"test\",\"sql\":\"INSERT INTO sg211(time, id1, s1)
values(100, 'd1', 0)\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The non query rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void createDatabase() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("https://127.0.0.1:18080/rest/table/v1/nonQuery");
+ String sql = "{\"database\":\"\",\"sql\":\"create database if not exists
test\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The non query rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void createTable() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("https://127.0.0.1:18080/rest/table/v1/nonQuery");
+ String sql =
+ "{\"database\":\"test\",\"sql\":\"create table sg211 (id1 string
id,t1 STRING ATTRIBUTE, s1 FLOAT measurement)\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("create table failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+
+ public void query() {
+ CloseableHttpClient httpClient = SSLClient.getInstance().getHttpClient();
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost =
getHttpPost("https://127.0.0.1:18080/rest/table/v1/query");
+ String sql = "{\"database\":\"test\",\"sql\":\"select * from sg211\"}";
+ httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, UTF8);
+
System.out.println(JsonParser.parseString(message).getAsJsonObject().toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("The query rest api failed");
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("Response close error");
+ }
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceCaseWhenThenIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceCaseWhenThenIT.java
new file mode 100644
index 00000000000..4e48015e24e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceCaseWhenThenIT.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceCaseWhenThenIT {
+
+ private int port = 18080;
+ private CloseableHttpClient httpClient = null;
+
+ @Before
+ public void setUp() throws Exception {
+ BaseEnv baseEnv = EnvFactory.getEnv();
+ baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ baseEnv.initClusterEnvironment();
+ DataNodeWrapper portConflictDataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ port = portConflictDataNodeWrapper.getRestServicePort();
+ httpClient = HttpClientBuilder.create().build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ private static final String DATABASE = "test";
+
+ private static final String[] expectedHeader = {"_col0"};
+
+ private static final String[] SQLs =
+ new String[] {
+ // normal cases
+ "CREATE DATABASE " + DATABASE,
+ "CREATE table table1 (device_id STRING ID, s1 INT32 MEASUREMENT, s5
BOOLEAN MEASUREMENT, s6 TEXT MEASUREMENT)",
+ "CREATE table table2 (device_id STRING ID, s3 FLOAT MEASUREMENT, s4
DOUBLE MEASUREMENT)",
+ "CREATE table table3 (device_id STRING ID, s2 INT64 MEASUREMENT)",
+ "INSERT INTO table1(time, device_id, s1) values(100, 'd1', 0)",
+ "INSERT INTO table1(time, device_id, s1) values(200, 'd1', 11)",
+ "INSERT INTO table1(time, device_id, s1) values(300, 'd1', 22)",
+ "INSERT INTO table1(time, device_id, s1) values(400, 'd1', 33)",
+ "INSERT INTO table2(time, device_id, s3) values(100, 'd1', 0)",
+ "INSERT INTO table2(time, device_id, s3) values(200, 'd1', 11)",
+ "INSERT INTO table2(time, device_id, s3) values(300, 'd1', 22)",
+ "INSERT INTO table2(time, device_id, s3) values(400, 'd1', 33)",
+ "INSERT INTO table2(time, device_id, s4) values(100, 'd1', 44)",
+ "INSERT INTO table2(time, device_id, s4) values(200, 'd1', 55)",
+ "INSERT INTO table2(time, device_id, s4) values(300, 'd1', 66)",
+ "INSERT INTO table2(time, device_id, s4) values(400, 'd1', 77)",
+ };
+
+ @Test
+ public void test() {
+ ping();
+ prepareTableData();
+ testKind1Basic();
+ testKind2Basic();
+ testShortCircuitEvaluation();
+ testKind1InputTypeRestrict();
+ testKind2InputTypeRestrict();
+ testKind1OutputTypeRestrict();
+ testKind2OutputTypeRestrict();
+ testKind1UsedInOtherOperation();
+ testKind2UsedInOtherOperation();
+ testKind1UseOtherOperation();
+ testKind2UseOtherOperation();
+ testKind1UseInWhereClause();
+ testKind1CaseInCase();
+ testKind2CaseInCase();
+ testKind1Logic();
+ testKind2UseOtherOperation();
+ testKind1UseInWhereClause();
+ }
+
+ public void ping() {
+ HttpGet httpGet = new HttpGet("http://127.0.0.1:" + port + "/ping");
+ CloseableHttpResponse response = null;
+ try {
+ for (int i = 0; i < 30; i++) {
+ try {
+ response = httpClient.execute(httpGet);
+ break;
+ } catch (Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ assertEquals(200, Integer.parseInt(result.get("code").toString()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ public void testKind1Basic() {
+ String[] retArray = new String[] {"99,", "9999,", "9999,", "999,"};
+ tableResultSetEqualTest(
+ "select case when s1=0 then 99 when s1>22 then 999 else 9999 end from
table1",
+ expectedHeader,
+ retArray,
+ DATABASE);
+ }
+
+ public void testKind2Basic() {
+ String sql = "select case s1 when 0 then 99 when 22 then 999 else 9999 end
from table1";
+ String[] retArray = new String[] {"99,", "9999,", "999,", "9999,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // without ELSE clause
+ sql = "select case s1 when 0 then 99 when 22 then 999 end from table1";
+ retArray = new String[] {"99,", "null,", "999,", "null,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testShortCircuitEvaluation() {
+ String[] retArray = new String[] {"0,", "11,", "22,", "33,"};
+ tableResultSetEqualTest(
+ "select case when 1=0 then s1/0 when 1!=0 then s1 end from table1",
+ expectedHeader,
+ retArray,
+ DATABASE);
+ }
+
+ public void testKind1InputTypeRestrict() {
+ // WHEN clause must return BOOLEAN
+ String sql = "select case when s1+1 then 20 else 22 end from table1";
+ String msg = "701: CASE WHEN clause must evaluate to a BOOLEAN (actual:
INT32)";
+ tableAssertTestFail(sql, msg, DATABASE);
+ }
+
+ public void testKind2InputTypeRestrict() {
+ // the expression in CASE clause must be able to be equated with the
expression in WHEN clause
+ String sql = "select case s1 when '1' then 20 else 22 end from table1";
+ String msg = "701: CASE operand type does not match WHEN clause operand
type: INT32 vs STRING";
+ tableAssertTestFail(sql, msg, DATABASE);
+ }
+
+ public void testKind1OutputTypeRestrict() {
+ // BOOLEAN and other types cannot exist at the same time
+ String[] retArray = new String[] {"true,", "false,", "true,", "true,"};
+ // success
+ tableResultSetEqualTest(
+ "select case when s1<=0 then true when s1=11 then false else true end
from table1",
+ expectedHeader,
+ retArray,
+ DATABASE);
+ // fail
+ tableAssertTestFail(
+ "select case when s1<=0 then true else 22 end from table1",
+ "701: All CASE results must be the same type or coercible to a common
type. Cannot find common type between BOOLEAN and INT32, all types (without
duplicates): [BOOLEAN, INT32]",
+ DATABASE);
+
+ // TEXT and other types cannot exist at the same time
+ retArray = new String[] {"good,", "bad,", "okok,", "okok,"};
+ // success
+ tableResultSetEqualTest(
+ "select case when s1<=0 then 'good' when s1=11 then 'bad' else 'okok'
end from table1",
+ expectedHeader,
+ retArray,
+ DATABASE);
+ // fail
+ tableAssertTestFail(
+ "select case when s1<=0 then 'good' else 22 end from table1",
+ "701: All CASE results must be the same type or coercible to a common
type. Cannot find common type between STRING and INT32, all types (without
duplicates): [STRING, INT32]",
+ DATABASE);
+ }
+
+ public void testKind2OutputTypeRestrict() {
+ // BOOLEAN and other types cannot exist at the same time
+ String[] retArray =
+ new String[] {
+ "true,", "false,", "true,", "true,",
+ };
+ // success
+ tableResultSetEqualTest(
+ "select case s1 when 0 then true when 11 then false else true end from
table1",
+ expectedHeader,
+ retArray,
+ DATABASE);
+ // fail
+ tableAssertTestFail(
+ "select case s1 when 0 then true else 22 end from table1",
+ "701: All CASE results must be the same type or coercible to a common
type. Cannot find common type between BOOLEAN and INT32, all types (without
duplicates): [BOOLEAN, INT32]",
+ DATABASE);
+
+ // TEXT and other types cannot exist at the same time
+ retArray = new String[] {"good,", "bad,", "okok,", "okok,"};
+ // success
+ tableResultSetEqualTest(
+ "select case s1 when 0 then 'good' when 11 then 'bad' else 'okok' end
from table1",
+ expectedHeader,
+ retArray,
+ DATABASE);
+ // fail
+ tableAssertTestFail(
+ "select case s1 when 0 then 'good' else 22 end from table1",
+ "701: All CASE results must be the same type or coercible to a common
type. Cannot find common type between STRING and INT32, all types (without
duplicates): [STRING, INT32]",
+ DATABASE);
+ }
+
+ public void testKind1UsedInOtherOperation() {
+ String sql;
+ String[] retArray;
+
+ // use in scalar operation
+
+ // multiply
+ sql = "select 2 * case when s1=0 then 99 when s1=22.0 then 999 else 9999
end from table1";
+ retArray = new String[] {"198,", "19998,", "1998,", "19998,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // add
+ sql =
+ "select "
+ + "case when s1=0 then 99 when s1=22.0 then 999 else 9999 end "
+ + "+"
+ + "case when s1=11 then 99 else 9999 end "
+ + "from table1";
+ retArray =
+ new String[] {
+ "10098,", "10098,", "10998,", "19998,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // function
+ sql = "select diff(case when s1=0 then 99 when s1>22 then 999 else 9999
end) from table1";
+ retArray = new String[] {"null,", "9900.0,", "0.0,", "-9000.0,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testKind2UsedInOtherOperation() {
+ String sql;
+ String[] retArray;
+
+ // use in scalar operation
+
+ // multiply
+ sql = "select 2 * case s1 when 0 then 99 when 22 then 999 else 9999 end
from table1";
+ retArray = new String[] {"198,", "19998,", "1998,", "19998,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ sql = "select diff(case s1 when 0 then 99 when 22 then 999 else 9999 end)
from table1";
+ retArray =
+ new String[] {
+ "null,", "9900.0,", "-9000.0,", "9000.0,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testKind1UseOtherOperation() {
+ // WHEN-clause use scalar function
+ String sql = "select case when sin(s1)>=0 then '>0' else '<0' end from
table1";
+ String[] retArray =
+ new String[] {
+ ">0,", "<0,", "<0,", ">0,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // THEN-clause and ELSE-clause use scalar function
+
+ // TODO: align by is not supported.
+
+ // sql =
+ // "select case when s1<=11 then CAST(diff(s1) as TEXT) else
CAST(s1-1 as TEXT) end from
+ // table1 align by device";
+ //
+ // retArray =
+ // new String[] {
+ // "0,table1,null,",
+ // "1000000,table1,11.0,",
+ // "20000000,table1,21.0,",
+ // "210000000,table1,32.0,",
+ // };
+ // tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testKind2UseOtherOperation() {
+ // CASE-clause use scalar function
+ String sql =
+ "select case round(sin(s1)) when 0 then '=0' when -1 then '<0' else
'>0' end from table1";
+
+ tableAssertTestFail(
+ sql,
+ "701: CASE operand type does not match WHEN clause operand type:
DOUBLE vs INT32",
+ DATABASE);
+ // String[] retArray =
+ // new String[] {
+ // "=0,", "<0,", ">0,", ">0,",
+ // };
+ // tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // WHEN-clause use scalar function
+ sql = "select case 0 when sin(s1) then '=0' else '!=0' end from table1";
+ tableAssertTestFail(
+ sql,
+ "701: CASE operand type does not match WHEN clause operand type: INT32
vs DOUBLE",
+ DATABASE);
+ // retArray =
+ // new String[] {
+ // "=0,", "!=0,", "!=0,", "!=0,",
+ // };
+ // tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // THEN-clause and ELSE-clause use scalar function
+ // sql =
+ // "select case s1 when 11 then CAST(diff(s1) as TEXT) else
CAST(s1-1 as TEXT) end from
+ // table1 align by device";
+ //
+ // retArray =
+ // new String[] {
+ // "table1,-1.0,", "table1,11.0,", "table1,21.0,", "table1,32.0,",
+ // };
+ // tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+
+ // UDF is not allowed
+ // sql = "select case s1 when 0 then change_points(s1) end from table1";
+ // String msg = "301: CASE expression cannot be used with non-mappable
UDF";
+ // tableAssertTestFail(sql, msg, DATABASE);
+ }
+
+ public void testKind1UseInWhereClause() {
+ String sql =
+ "select s4 from table2 where case when s3=0 then s4>44 when s3=22 then
s4>0 when time>300 then true end";
+ String[] retArray = new String[] {"66.0,", "77.0,"};
+ tableResultSetEqualTest(sql, new String[] {"s4"}, retArray, DATABASE);
+
+ sql =
+ "select case when s3=0 then s4>44 when s3=22 then s4>0 when time>300
then true end from table2";
+ retArray =
+ new String[] {
+ "false,", "null,", "true,", "true,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testKind1CaseInCase() {
+ String sql =
+ "select case when s1=0 OR s1=22 then cast(case when s1=0 then 99 when
s1>22 then 999 end as STRING) else 'xxx' end from table1";
+ String[] retArray =
+ new String[] {
+ "99,", "xxx,", "null,", "xxx,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testKind2CaseInCase() {
+ String sql =
+ "select case s1 when 0 then cast(case when s1=0 then 99 when s1>22
then 999 end as STRING) when 22 then cast(case when s1=0 then 99 when s1>22
then 999 end as STRING) else 'xxx' end from table1";
+ String[] retArray =
+ new String[] {
+ "99,", "xxx,", "null,", "xxx,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void testKind1Logic() {
+ String sql =
+ "select case when s3 >= 0 and s3 < 20 and s4 >= 50 and s4 < 60 then
'just so so~~~' when s3 >= 20 and s3 < 40 and s4 >= 70 and s4 < 80 then 'very
well~~~' end from table2";
+ String[] retArray = new String[] {"null,", "just so so~~~,", "null,",
"very well~~~,"};
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE);
+ }
+
+ public void tableAssertTestFail(String sql, String errMsg, String database) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("database", database);
+ jsonObject.addProperty("sql", sql);
+ JsonObject result = query(jsonObject.toString());
+ assertEquals(errMsg, result.get("code") + ": " +
result.get("message").getAsString());
+ }
+
+ public void tableResultSetEqualTest(
+ String sql, String[] expectedHeader, String[] expectedRetArray, String
database) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("database", database);
+ jsonObject.addProperty("sql", sql);
+ JsonObject result = query(jsonObject.toString());
+ JsonArray columnNames = result.get("column_names").getAsJsonArray();
+ JsonArray valuesList = result.get("values").getAsJsonArray();
+ for (int i = 0; i < columnNames.size(); i++) {
+ assertEquals(expectedHeader[i], columnNames.get(i).getAsString());
+ }
+ assertEquals(expectedHeader.length, columnNames.size());
+ int cnt = 0;
+ for (int i = 0; i < valuesList.size(); i++) {
+ StringBuilder builder = new StringBuilder();
+ JsonArray values = valuesList.get(i).getAsJsonArray();
+ for (int c = 0; c < values.size(); c++) {
+ if (!values.get(c).isJsonNull()) {
+ builder.append(values.get(c).getAsString()).append(",");
+ } else {
+ builder.append(values.get(c).toString()).append(",");
+ }
+ }
+ assertEquals(expectedRetArray[i], builder.toString());
+ cnt++;
+ }
+ assertEquals(expectedRetArray.length, cnt);
+ }
+
+ public void prepareTableData() {
+ for (int i = 0; i < SQLs.length; i++) {
+ JsonObject jsonObject = new JsonObject();
+ if (i > 0) {
+ jsonObject.addProperty("database", DATABASE);
+ } else {
+ jsonObject.addProperty("database", "");
+ }
+ jsonObject.addProperty("sql", SQLs[i]);
+ nonQuery(jsonObject.toString());
+ }
+ }
+
+ public JsonObject query(String json) {
+ return RestUtils.query(httpClient, port, json);
+ }
+
+ public JsonObject nonQuery(String json) {
+ return RestUtils.nonQuery(httpClient, port, json);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceFlushQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceFlushQueryIT.java
new file mode 100644
index 00000000000..0b37fe33500
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceFlushQueryIT.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceFlushQueryIT {
+
+ private int port = 18080;
+ private CloseableHttpClient httpClient = null;
+
+ @Before
+ public void setUp() throws Exception {
+ BaseEnv baseEnv = EnvFactory.getEnv();
+ baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ baseEnv.initClusterEnvironment();
+ DataNodeWrapper portConflictDataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ port = portConflictDataNodeWrapper.getRestServicePort();
+ httpClient = HttpClientBuilder.create().build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ private static final String DATABASE = "test";
+
+ private static final String[] sqls =
+ new String[] {
+ "CREATE DATABASE test",
+ "CREATE TABLE vehicle (id1 string id, s0 int32 measurement)",
+ "insert into vehicle(id1,time,s0) values('d0',1,101)",
+ "insert into vehicle(id1,time,s0) values('d0',2,198)",
+ "insert into vehicle(id1,time,s0) values('d0',100,99)",
+ "insert into vehicle(id1,time,s0) values('d0',101,99)",
+ "insert into vehicle(id1,time,s0) values('d0',102,80)",
+ "insert into vehicle(id1,time,s0) values('d0',103,99)",
+ "insert into vehicle(id1,time,s0) values('d0',104,90)",
+ "insert into vehicle(id1,time,s0) values('d0',105,99)",
+ "insert into vehicle(id1,time,s0) values('d0',106,99)",
+ "flush",
+ "insert into vehicle(id1,time,s0) values('d0',2,10000)",
+ "insert into vehicle(id1,time,s0) values('d0',50,10000)",
+ "insert into vehicle(id1,time,s0) values('d0',1000,22222)",
+ };
+
+ public void ping() {
+ HttpGet httpGet = new HttpGet("http://127.0.0.1:" + port + "/ping");
+ CloseableHttpResponse response = null;
+ try {
+ for (int i = 0; i < 30; i++) {
+ try {
+ response = httpClient.execute(httpGet);
+ break;
+ } catch (Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ assertEquals(200, Integer.parseInt(result.get("code").toString()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void test() {
+ ping();
+ prepareTableData();
+ selectAllSQLTest();
+ testFlushGivenGroup();
+ testFlushGivenGroupNoData();
+ }
+
+ public String sqlHandler(String database, String sql) {
+ JsonObject json = new JsonObject();
+ json.addProperty("database", database);
+ json.addProperty("sql", sql);
+ return json.toString();
+ }
+
+ public void selectAllSQLTest() {
+ String sql = sqlHandler("test", "SELECT * FROM vehicle");
+ JsonObject jsonObject = query(sql);
+ JsonArray valuesList = jsonObject.getAsJsonArray("values");
+ for (int i = 0; i < valuesList.size(); i++) {
+ JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+ for (int j = 0; j < jsonArray.size(); j++) {
+ jsonArray.get(j);
+ }
+ }
+ }
+
+ public void testFlushGivenGroup() {
+ List<String> list =
+ Arrays.asList("CREATE DATABASE group1", "CREATE DATABASE group2",
"CREATE DATABASE group3");
+ for (String sql : list) {
+ nonQuery(sqlHandler("", sql));
+ }
+
+ String insertTemplate =
+ "INSERT INTO vehicle(id1, time, s1, s2, s3) VALUES (%s, %d, %d, %f,
%s)";
+ for (int i = 1; i <= 3; i++) {
+ nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+ nonQuery(
+ sqlHandler(
+ String.format("group%d", i),
+ "CREATE TABLE vehicle (id1 string id, s1 int32 measurement, s2
float measurement, s3 string measurement)"));
+
+ for (int j = 10; j < 20; j++) {
+ nonQuery(String.format(Locale.CHINA, insertTemplate, i, j, j, j * 0.1,
j));
+ }
+ }
+ nonQuery(sqlHandler("", "FLUSH"));
+
+ for (int i = 1; i <= 3; i++) {
+ nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+ nonQuery(
+ sqlHandler(
+ String.format("group%d", i),
+ "CREATE TABLE vehicle (id1 string id, s1 int32 measurement, s2
float measurement, s3 string measurement)"));
+ }
+ nonQuery(sqlHandler("", "FLUSH group1"));
+ nonQuery(sqlHandler("", "FLUSH group2,group3"));
+
+ for (int i = 1; i <= 3; i++) {
+ nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+ nonQuery(
+ sqlHandler(
+ String.format("group%d", i),
+ "CREATE TABLE vehicle (id1 string id, s1 int32 measurement, s2
float measurement, s3 string measurement)"));
+
+ for (int j = 0; j < 30; j++) {
+ nonQuery(String.format(Locale.CHINA, insertTemplate, i, j, j, j * 0.1,
j));
+ }
+ }
+ nonQuery(sqlHandler("", "FLUSH group1 TRUE"));
+ nonQuery(sqlHandler("", "FLUSH group2,group3 FALSE"));
+
+ for (int i = 1; i <= 3; i++) {
+ int count = 0;
+ nonQuery(sqlHandler("", String.format("USE \"group%d\"", i)));
+ JsonObject jsonObject =
+ query(sqlHandler(String.format("group%d", i), "SELECT * FROM
vehicle"));
+
+ JsonArray valuesList = jsonObject.getAsJsonArray("values");
+ for (int c = 0; c < valuesList.size(); c++) {
+ count++;
+ JsonArray jsonArray = valuesList.get(c).getAsJsonArray();
+ for (int j = 0; j < jsonArray.size(); j++) {
+ jsonArray.get(j);
+ }
+ assertEquals(30, count);
+ }
+ }
+ }
+
+ public void testFlushGivenGroupNoData() {
+ List<String> list =
+ Arrays.asList(
+ "CREATE DATABASE nodatagroup1",
+ "CREATE DATABASE nodatagroup2",
+ "CREATE DATABASE nodatagroup3",
+ "FLUSH nodatagroup1",
+ "FLUSH nodatagroup2",
+ "FLUSH nodatagroup3",
+ "FLUSH nodatagroup1, nodatagroup2");
+ for (String sql : list) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("database", "");
+ jsonObject.addProperty("sql", sql);
+ nonQuery(jsonObject.toString());
+ }
+ }
+
+ public void tableAssertTestFail(String sql, String errMsg, String database) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("database", database);
+ jsonObject.addProperty("sql", sql);
+ JsonObject result = query(jsonObject.toString());
+ assertEquals(errMsg, result.get("code") + ": " +
result.get("message").getAsString());
+ }
+
+ public void tableResultSetEqualTest(
+ String sql, String[] expectedHeader, String[] expectedRetArray, String
database) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("database", database);
+ jsonObject.addProperty("sql", sql);
+ JsonObject result = query(jsonObject.toString());
+ JsonArray columnNames = result.get("column_names").getAsJsonArray();
+ JsonArray valuesList = result.get("values").getAsJsonArray();
+ for (int i = 0; i < columnNames.size(); i++) {
+ assertEquals(expectedHeader[i], columnNames.get(i).getAsString());
+ }
+ assertEquals(expectedHeader.length, columnNames.size());
+ int cnt = 0;
+ for (int i = 0; i < valuesList.size(); i++) {
+ StringBuilder builder = new StringBuilder();
+ JsonArray values = valuesList.get(i).getAsJsonArray();
+ for (int c = 0; c < values.size(); c++) {
+ if (!values.get(c).isJsonNull()) {
+ builder.append(values.get(c).getAsString()).append(",");
+ } else {
+ builder.append(values.get(c).toString()).append(",");
+ }
+ }
+ assertEquals(expectedRetArray[i], builder.toString());
+ cnt++;
+ }
+ assertEquals(expectedRetArray.length, cnt);
+ }
+
+ public void prepareTableData() {
+ for (int i = 0; i < sqls.length; i++) {
+ JsonObject jsonObject = new JsonObject();
+ if (i > 0) {
+ jsonObject.addProperty("database", DATABASE);
+ } else {
+ jsonObject.addProperty("database", "");
+ }
+ jsonObject.addProperty("sql", sqls[i]);
+ nonQuery(jsonObject.toString());
+ }
+ }
+
+ public JsonObject query(String json) {
+ return RestUtils.query(httpClient, port, json);
+ }
+
+ public JsonObject nonQuery(String json) {
+ return RestUtils.nonQuery(httpClient, port, json);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceIT.java
new file mode 100644
index 00000000000..8d3a3b54684
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceIT.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceIT {
+
+ private int port = 18080;
+ private CloseableHttpClient httpClient = null;
+
+ @Before
+ public void setUp() throws Exception {
+ BaseEnv baseEnv = EnvFactory.getEnv();
+ baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ baseEnv.initClusterEnvironment();
+ DataNodeWrapper portConflictDataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ port = portConflictDataNodeWrapper.getRestServicePort();
+ httpClient = HttpClientBuilder.create().build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ private static final String DATABASE = "test";
+
+ private static final String[] sqls =
+ new String[] {
+ "create database if not exists test",
+ "use test",
+ "CREATE TABLE sg10(id1 string id, s1 int64 measurement, s2 float
measurement, s3 string measurement)",
+ "CREATE TABLE sg11(id1 string id, s1 int64 measurement, s2 float
measurement, s3 string measurement)"
+ };
+
+ public void ping() {
+ HttpGet httpGet = new HttpGet("http://127.0.0.1:" + port + "/ping");
+ CloseableHttpResponse response = null;
+ try {
+ for (int i = 0; i < 30; i++) {
+ try {
+ response = httpClient.execute(httpGet);
+ break;
+ } catch (Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ assertEquals(200, Integer.parseInt(result.get("code").toString()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void test() {
+ ping();
+ prepareTableData();
+ rightNonQuery();
+ rightNonQuery2();
+ rightNonQuery3();
+ rightNonQuery4();
+ errorNonQuery();
+ errorNonQuery1();
+ errorNonQuery3();
+ testInsertMultiPartition();
+ testInsertTablet();
+ testInsertTabletNoDatabase();
+ testInsertTablet1();
+ testInsertTablet2();
+ testQuery();
+ testQuery1();
+ testQuery2();
+ }
+
+ public void testQuery() {
+ String sql = "insert into sg11(id1,s1,s2,s3,time)
values('aa',11,1.1,1,1),('aa2',21,2.1,2,2)";
+ JsonObject result = RestUtils.nonQuery(httpClient, port,
sqlHandler("test", sql));
+ assertEquals(200, result.get("code").getAsInt());
+ JsonObject queryResult =
+ RestUtils.query(
+ httpClient,
+ port,
+ sqlHandler("test", "select id1,s1,s2,s3,time from sg11 order by
time"));
+ JsonArray jsonArray = queryResult.get("values").getAsJsonArray();
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JsonArray jsonArray1 = jsonArray.get(i).getAsJsonArray();
+
+ if (i == 0) {
+ assertEquals("aa", jsonArray1.get(0).getAsString());
+ assertEquals(11, jsonArray1.get(1).getAsInt());
+ assertEquals(1.1, jsonArray1.get(2).getAsFloat(), 0.000001f);
+ assertEquals("1", jsonArray1.get(3).getAsString());
+ } else if (i == 1) {
+ assertEquals("aa2", jsonArray1.get(0).getAsString());
+ assertEquals(21, jsonArray1.get(1).getAsInt());
+ assertEquals(2.1, jsonArray1.get(2).getAsFloat(), 0.000001f);
+ assertEquals("2", jsonArray1.get(3).getAsString());
+ assertEquals(2, jsonArray1.get(4).getAsLong());
+ }
+ }
+ }
+
+ public void testQuery1() {
+ JsonObject result =
+ RestUtils.query(
+ httpClient, port, sqlHandler(null, "select id1,s1,s2,s3,time from
sg11 order by time"));
+ assertEquals(305, result.get("code").getAsInt());
+ assertEquals("database should not be null",
result.get("message").getAsString());
+ }
+
+ public void testQuery2() {
+ JsonObject result = RestUtils.query(httpClient, port, sqlHandler("test",
null));
+ assertEquals(305, result.get("code").getAsInt());
+ assertEquals("sql should not be null",
result.get("message").getAsString());
+ }
+
+ public void rightNonQuery() {
+ String sql = "create database test1";
+ JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler("",
sql));
+ assertEquals(200, result.get("code").getAsInt());
+ }
+
+ public void rightNonQuery2() {
+ String sql = "insert into sg10(id1,s1,time,s2) values('aa',1,1,1.1)";
+ JsonObject result = RestUtils.nonQuery(httpClient, port,
sqlHandler("test", sql));
+ assertEquals(200, result.get("code").getAsInt());
+ }
+
+ public void rightNonQuery4() {
+ String sql = "insert into sg10(id1,s1,time,s2)
values('aa',1,1,1.1),('bb',2,2,2.1)";
+ JsonObject result = RestUtils.nonQuery(httpClient, port,
sqlHandler("test", sql));
+ assertEquals(200, result.get("code").getAsInt());
+ }
+
+ public void rightNonQuery3() {
+ String sql = "drop database test1";
+ JsonObject result = RestUtils.nonQuery(httpClient, port,
sqlHandler("test", sql));
+ assertEquals(200, result.get("code").getAsInt());
+ }
+
+ public void errorNonQuery() {
+ String sql = "create database test";
+ JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler("",
sql));
+ assertEquals(501, result.get("code").getAsInt());
+ assertEquals("Database test already exists",
result.get("message").getAsString());
+ }
+
+ public void errorNonQuery1() {
+ String sql =
+ "CREATE TABLE sg10(id1 string id, s1 int64 measurement, s2 float
measurement, s3 string measurement)";
+ JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler(null,
sql));
+ assertEquals(305, result.get("code").getAsInt());
+ assertEquals("database should not be null",
result.get("message").getAsString());
+ }
+
+ public void errorNonQuery2() {
+ String sql = "create database test";
+ JsonObject result = RestUtils.nonQuery(httpClient, port, sqlHandler(null,
sql));
+ assertEquals(305, result.get("code").getAsInt());
+ assertEquals("database should not be null",
result.get("message").getAsString());
+ }
+
+ public void errorNonQuery3() {
+ String sql = "select * from sg10";
+ JsonObject result = RestUtils.nonQuery(httpClient, port,
sqlHandler("test", sql));
+ assertEquals(301, result.get("code").getAsInt());
+ assertEquals("EXECUTE_STATEMENT_ERROR",
result.get("message").getAsString());
+ }
+
+ public String sqlHandler(String database, String sql) {
+ JsonObject json = new JsonObject();
+ json.addProperty("database", database);
+ json.addProperty("sql", sql);
+ return json.toString();
+ }
+
+ public void testInsertMultiPartition() {
+ List<String> sqls =
+ Arrays.asList(
+ "create table sg1 (id1 string id, s1 int32 measurement)",
+ "insert into sg1(id1,time,s1) values('d1',1,2)",
+ "flush",
+ "insert into sg1(id1,time,s1) values('d1',2,2)",
+ "insert into sg1(id1,time,s1) values('d1',604800001,2)",
+ "flush");
+ for (String sql : sqls) {
+ RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+ }
+ }
+
+ public void testInsertTablet() {
+ List<String> sqls =
+ Collections.singletonList(
+ "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT
measurement)");
+ for (String sql : sqls) {
+ RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+ }
+ String json =
+
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+ rightInsertTablet(json);
+ }
+
+ public void testInsertTabletNoDatabase() {
+ List<String> sqls =
+ Collections.singletonList(
+ "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT
measurement)");
+ for (String sql : sqls) {
+ RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+ }
+ String json =
+
"{\"database\":\"\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+ JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+ assertEquals(305, Integer.parseInt(result.get("code").toString()));
+ }
+
+ public void testInsertTablet1() {
+ List<String> sqls =
+ Collections.singletonList(
+ "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT
measurement)");
+ for (String sql : sqls) {
+ RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+ }
+ String json =
+
"{\"database\":\"test\",\"column_types\":[\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232173960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+ JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+ assertEquals(305, Integer.parseInt(result.get("code").toString()));
+ assertEquals(
+ "column_names and column_types should have the same size,column_types
and data_types should have the same size",
+ result.get("message").getAsString());
+ }
+
+ public void testInsertTablet2() {
+ List<String> sqls =
+ Collections.singletonList(
+ "create table sg211 (id1 string id,t1 STRING ATTRIBUTE, s1 FLOAT
measurement)");
+ for (String sql : sqls) {
+ RestUtils.nonQuery(httpClient, port, sqlHandler("test", sql));
+ }
+ String json =
+
"{\"database\":\"test\",\"column_types\":[\"ID\",\"ATTRIBUTE\",\"MEASUREMENT\"],\"timestamps\":[1635232143960,1635232153960,1635232163960,1635232183960],\"column_names\":[\"id1\",\"t1\",\"s1\"],\"data_types\":[\"STRING\",\"STRING\",\"FLOAT\"],\"values\":[[\"a11\",\"true\",11],[\"a11\",\"false\",22],[\"a13\",\"false1\",23],[\"a14\",\"false2\",24],[\"a15\",\"false3\",25]],\"table\":\"sg211\"}";
+ JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+ assertEquals(305, Integer.parseInt(result.get("code").toString()));
+ assertEquals(
+ "values and data_types should have the same size",
result.get("message").getAsString());
+ }
+
+ public void rightInsertTablet(String json) {
+ JsonObject result = RestUtils.insertTablet(httpClient, port, json);
+ assertEquals(200, Integer.parseInt(result.get("code").toString()));
+ JsonObject queryResult =
+ RestUtils.query(
+ httpClient, port, sqlHandler("test", "select id1,t1,s1 from sg211
order by time"));
+ JsonArray jsonArray = queryResult.get("values").getAsJsonArray();
+ JsonArray jsonArray1 = jsonArray.get(0).getAsJsonArray();
+ assertEquals("a11", jsonArray1.get(0).getAsString());
+ assertEquals("false", jsonArray1.get(1).getAsString());
+ assertEquals(11f, jsonArray1.get(2).getAsFloat(), 0f);
+ }
+
+ public void prepareTableData() {
+ for (int i = 0; i < sqls.length; i++) {
+ JsonObject jsonObject = new JsonObject();
+ if (i > 0) {
+ jsonObject.addProperty("database", DATABASE);
+ } else {
+ jsonObject.addProperty("database", "");
+ }
+ jsonObject.addProperty("sql", sqls[i]);
+ RestUtils.nonQuery(httpClient, port, jsonObject.toString());
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceInsertAlignedValuesIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceInsertAlignedValuesIT.java
new file mode 100644
index 00000000000..a3b8ca7a880
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/IoTDBRestServiceInsertAlignedValuesIT.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.rest.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBRestServiceInsertAlignedValuesIT {
+
+ private int port = 18080;
+ private CloseableHttpClient httpClient = null;
+
+ @Before
+ public void setUp() throws Exception {
+ BaseEnv baseEnv = EnvFactory.getEnv();
+ baseEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ baseEnv.initClusterEnvironment();
+ DataNodeWrapper portConflictDataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ port = portConflictDataNodeWrapper.getRestServicePort();
+ httpClient = HttpClientBuilder.create().build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ private static final String DATABASE = "test";
+
+ private static final String[] sqls = new String[] {"CREATE DATABASE t1"};
+
+ public void ping() {
+ HttpGet httpGet = new HttpGet("http://127.0.0.1:" + port + "/ping");
+ CloseableHttpResponse response = null;
+ try {
+ for (int i = 0; i < 30; i++) {
+ try {
+ response = httpClient.execute(httpGet);
+ break;
+ } catch (Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ assertEquals(200, Integer.parseInt(result.get("code").toString()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void test() {
+ ping();
+ prepareTableData();
+ testInsertAlignedValues();
+ testUpdatingAlignedValues();
+ testInsertAlignedValuesWithSameTimestamp();
+ testInsertWithWrongMeasurementNum1();
+ testInsertWithWrongMeasurementNum2();
+ testInsertWithDuplicatedMeasurements();
+ testInsertMultiRows();
+ testInsertLargeNumber();
+ testExtendTextColumn();
+ }
+
+ public String sqlHandler(String database, String sql) {
+ JsonObject json = new JsonObject();
+ json.addProperty("database", database);
+ json.addProperty("sql", sql);
+ return json.toString();
+ }
+
+ public void testInsertAlignedValues() {
+ List<String> sqls =
+ Arrays.asList(
+ "create table wf01 (id1 string id, status boolean measurement,
temperature float measurement)",
+ "insert into wf01(id1, time, status, temperature) values ('wt01',
4000, true, 17.1)",
+ "insert into wf01(id1, time, status, temperature) values ('wt01',
5000, true, 20.1)",
+ "insert into wf01(id1, time, status, temperature) values ('wt01',
6000, true, 22)");
+ for (String sql : sqls) {
+ nonQuery(sqlHandler("t1", sql));
+ }
+ JsonObject jsonObject = query(sqlHandler("t1", "select time, status from
wf01"));
+ JsonArray valuesList = jsonObject.getAsJsonArray("values");
+ for (int i = 0; i < valuesList.size(); i++) {
+ JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ }
+
+ jsonObject = query(sqlHandler("t1", "select time, status, temperature from
wf01"));
+ valuesList = jsonObject.getAsJsonArray("values");
+ for (int i = 0; i < valuesList.size(); i++) {
+ JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+ if (i == 0) {
+ assertEquals(4000, jsonArray.get(0).getAsLong());
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ assertEquals(17.1, jsonArray.get(2).getAsDouble(), 0.1);
+ } else if (i == 1) {
+ assertEquals(5000, jsonArray.get(0).getAsLong());
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ assertEquals(20.1, jsonArray.get(2).getAsDouble(), 0.1);
+ } else if (i == 2) {
+ assertEquals(6000, jsonArray.get(0).getAsLong());
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ assertEquals(22.0, jsonArray.get(2).getAsDouble(), 0.1);
+ }
+ }
+ }
+
+ public void testUpdatingAlignedValues() {
+ List<String> sqls =
+ Arrays.asList(
+ "create table wf03 (id1 string id, status boolean measurement,
temperature float measurement)",
+ "insert into wf03(id1, time, status, temperature) values ('wt01',
4000, true, 17.1)",
+ "insert into wf03(id1, time, status) values ('wt01', 5000, true)",
+ "insert into wf03(id1, time, temperature)values ('wt01', 5000,
20.1)",
+ "insert into wf03(id1, time, temperature)values ('wt01', 6000,
22)");
+ for (String sql : sqls) {
+ nonQuery(sqlHandler("t1", sql));
+ }
+
+ JsonObject jsonObject = query(sqlHandler("t1", "select time, status from
wf03"));
+ JsonArray valuesList = jsonObject.getAsJsonArray("values");
+ for (int i = 0; i < valuesList.size(); i++) {
+ JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+ if (i >= 2) {
+ assertTrue(jsonArray.get(1).isJsonNull());
+ } else {
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ }
+ }
+ jsonObject = query(sqlHandler("t1", "select time, status, temperature from
wf03"));
+ valuesList = jsonObject.getAsJsonArray("values");
+ for (int i = 0; i < valuesList.size(); i++) {
+ JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+ if (i == 0) {
+ assertEquals(4000, jsonArray.get(0).getAsLong());
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ assertEquals(17.1, jsonArray.get(2).getAsDouble(), 0.1);
+ } else if (i == 1) {
+ assertEquals(5000, jsonArray.get(0).getAsLong());
+ assertTrue(jsonArray.get(1).getAsBoolean());
+ assertEquals(20.1, jsonArray.get(2).getAsDouble(), 0.1);
+ } else if (i == 2) {
+ assertEquals(6000, jsonArray.get(0).getAsLong());
+ assertTrue(jsonArray.get(1).isJsonNull());
+ assertEquals(22.0f, jsonArray.get(2).getAsFloat(), 0.1);
+ }
+ }
+ }
+
+ public void testInsertAlignedValuesWithSameTimestamp() {
+ List<String> sqls =
+ Arrays.asList(
+ "create table sg3 (id1 string id, s2 double measurement, s1 double
measurement)",
+ "insert into sg3(id1,time,s2) values('d1',1,2)",
+ "insert into sg3(id1,time,s1) values('d1',1,2)");
+ for (String sql : sqls) {
+ nonQuery(sqlHandler("t1", sql));
+ }
+
+ JsonObject jsonObject = query(sqlHandler("t1", "select time, s1, s2 from
sg3"));
+ JsonArray valuesList = jsonObject.getAsJsonArray("values");
+ for (int i = 0; i < valuesList.size(); i++) {
+ JsonArray jsonArray = valuesList.get(i).getAsJsonArray();
+ for (int c = 0; c < jsonArray.size(); c++) {
+ assertEquals(1, jsonArray.get(0).getAsLong());
+ assertEquals(2.0d, jsonArray.get(1).getAsDouble(), 0.1);
+ assertEquals(2.0d, jsonArray.get(2).getAsDouble(), 0.1);
+ }
+ }
+ }
+
+ public void testInsertWithWrongMeasurementNum1() {
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "create table wf04 (id1 string id, status int32, temperature int32
measurement)"));
+ JsonObject jsonObject =
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "insert into wf04(id1, time, status, temperature)
values('wt01', 11000, 100)"));
+ assertEquals(
+ "701: Inconsistent numbers of non-time column names and values: 3-2",
+ jsonObject.get("code") + ": " +
jsonObject.get("message").getAsString());
+ }
+
+ public void testInsertWithWrongMeasurementNum2() {
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "create table wf04 (id1 string id, status int32, temperature int32
measurement)"));
+ JsonObject jsonObject =
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "insert into wf05(id1, time, status, temperature)
values('wt01', 11000, 100, 300, 400)"));
+ assertEquals(
+ "701: Inconsistent numbers of non-time column names and values: 3-4",
+ jsonObject.get("code") + ": " +
jsonObject.get("message").getAsString());
+ }
+
+ public void testInsertWithDuplicatedMeasurements() {
+ nonQuery(
+ sqlHandler("t1", "create table wf07(id1 string id, s3 boolean
measurement, status int32)"));
+ JsonObject jsonObject =
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "insert into wf07(id1, time, s3, status, status)
values('wt01', 100, true, 20.1, 20.2)"));
+ assertEquals(
+ "701: Insertion contains duplicated measurement: status",
+ jsonObject.get("code") + ": " +
jsonObject.get("message").getAsString());
+ }
+
+ public void testInsertMultiRows() {
+ nonQuery(
+ sqlHandler(
+ "t1", "create table sg8 (id1 string id, s1 int32 measurement, s2
int32 measurement)"));
+ JsonObject jsonObject =
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "insert into sg8(id1, time, s1, s2) values('d1', 10, 2, 2),
('d1', 11, 3, '3'), ('d1', 12,12.11,false)"));
+ assertEquals(
+ "507: Fail to insert measurements [s1, s2] caused by [data type is not
consistent, input 12.11, registered INT32, data type is not consistent, input
false, registered INT32]",
+ jsonObject.get("code") + ": " +
jsonObject.get("message").getAsString());
+ }
+
+ public void testInsertLargeNumber() {
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "create table sg9 (id1 string id, s98 int64 measurement, s99 int64
measurement)"));
+ JsonObject jsonObject =
+ nonQuery(
+ sqlHandler(
+ "t1",
+ "insert into sg9(id1, time, s98, s99) values('d1', 10, 2,
271840880000000000000000)"));
+ assertEquals(
+ "700: line 1:58: Invalid numeric literal: 271840880000000000000000",
+ jsonObject.get("code") + ": " +
jsonObject.get("message").getAsString());
+ }
+
+ public void testExtendTextColumn() {
+ List<String> sqls =
+ Arrays.asList(
+ "use t1",
+ "create table sg14 (id1 string id, s1 string measurement, s2
string measurement)",
+ "insert into sg14(id1,time,s1,s2) values('d1',1,'test','test')",
+ "insert into sg14(id1,time,s1,s2) values('d1',3,'test','test')",
+ "insert into sg14(id1,time,s1,s2) values('d1',3,'test','test')",
+ "insert into sg14(id1,time,s1,s2) values('d1',4,'test','test')",
+ "insert into sg14(id1,time,s1,s3) values('d1',5,'test','test')",
+ "insert into sg14(id1,time,s1,s2) values('d1',6,'test','test')",
+ "flush",
+ "insert into sg14(id1,time,s1,s3) values('d1',7,'test','test')");
+ try {
+ for (String sql : sqls) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("database", DATABASE);
+ jsonObject.addProperty("sql", sql);
+ nonQuery(jsonObject.toString());
+ }
+ } catch (Exception ignore) {
+
+ }
+ }
+
+ public void prepareTableData() {
+ for (int i = 0; i < sqls.length; i++) {
+ JsonObject jsonObject = new JsonObject();
+ if (i > 0) {
+ jsonObject.addProperty("database", DATABASE);
+ } else {
+ jsonObject.addProperty("database", "");
+ }
+ jsonObject.addProperty("sql", sqls[i]);
+ nonQuery(jsonObject.toString());
+ }
+ }
+
+ public JsonObject query(String json) {
+ return RestUtils.query(httpClient, port, json);
+ }
+
+ public JsonObject nonQuery(String json) {
+ return RestUtils.nonQuery(httpClient, port, json);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/RestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/RestUtils.java
new file mode 100644
index 00000000000..cf70f9900f4
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/rest/it/RestUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.rest.it;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import static org.junit.Assert.fail;
+
+public class RestUtils {
+
+ public static JsonObject insertTablet(CloseableHttpClient httpClient, int
port, String json) {
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost = getHttpPost("http://127.0.0.1:" + port +
"/rest/table/v1/insertTablet");
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ return JsonParser.parseString(message).getAsJsonObject();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ return new JsonObject();
+ }
+
+ private static HttpPost getHttpPost(String url) {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.addHeader("Content-type", "application/json; charset=utf-8");
+ httpPost.setHeader("Accept", "application/json");
+ String authorization = getAuthorization("root", "root");
+ httpPost.setHeader("Authorization", authorization);
+ return httpPost;
+ }
+
+ private static String getAuthorization(String username, String password) {
+ return Base64.getEncoder()
+ .encodeToString((username + ":" +
password).getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static JsonObject query(CloseableHttpClient httpClient, int port,
String json) {
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost = getHttpPost("http://127.0.0.1:" + port +
"/rest/table/v1/query");
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ return JsonParser.parseString(message).getAsJsonObject();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ return new JsonObject();
+ }
+
+ public static JsonObject nonQuery(CloseableHttpClient httpClient, int port,
String json) {
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost = getHttpPost("http://127.0.0.1:" + port +
"/rest/table/v1/nonQuery");
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ response = httpClient.execute(httpPost);
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ return JsonParser.parseString(message).getAsJsonObject();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ return new JsonObject();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExceptionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExceptionHandler.java
new file mode 100644
index 00000000000..6bcf93bf8c7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExceptionHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.DatabaseNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.protocol.rest.model.ExecutionStatus;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.parser.ParsingException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response.Status;
+
+import java.io.IOException;
+
+public class ExceptionHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ExceptionHandler.class);
+
+ private ExceptionHandler() {}
+
+ public static ExecutionStatus tryCatchException(Exception e) {
+ ExecutionStatus responseResult = new ExecutionStatus();
+ if (e instanceof QueryProcessException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(((QueryProcessException) e).getErrorCode());
+ } else if (e instanceof DatabaseNotSetException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(((DatabaseNotSetException) e).getErrorCode());
+ } else if (e instanceof StorageEngineException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(((StorageEngineException) e).getErrorCode());
+ } else if (e instanceof AuthException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(Status.BAD_REQUEST.getStatusCode());
+ } else if (e instanceof IllegalPathException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(((IllegalPathException) e).getErrorCode());
+ } else if (e instanceof MetadataException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(((MetadataException) e).getErrorCode());
+ } else if (e instanceof IoTDBException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(((IoTDBException) e).getErrorCode());
+ } else if (e instanceof ParseCancellationException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(TSStatusCode.SQL_PARSE_ERROR.getStatusCode());
+ } else if (e instanceof StatementAnalyzeException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(TSStatusCode.METADATA_ERROR.getStatusCode());
+ } else if (e instanceof SemanticException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(TSStatusCode.SEMANTIC_ERROR.getStatusCode());
+ } else if (!(e instanceof IOException) && !(e instanceof
RuntimeException)) {
+ responseResult.setMessage(e.getMessage());
+
responseResult.setCode(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ } else if (e instanceof ParsingException || e instanceof
IllegalArgumentException) {
+ responseResult.setMessage(e.getMessage());
+ responseResult.setCode(TSStatusCode.SQL_PARSE_ERROR.getStatusCode());
+ } else {
+ responseResult.setMessage(e.getMessage());
+
responseResult.setCode(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ LOGGER.warn(e.getMessage(), e);
+ return responseResult;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExecuteStatementHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExecuteStatementHandler.java
new file mode 100644
index 00000000000..6f29c11f6c6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/ExecuteStatementHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowAINodes;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCluster;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowClusterId;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowConfigNodes;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentDatabase;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentSqlDialect;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentTimestamp;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ExecuteStatementHandler {
+ private ExecuteStatementHandler() {}
+
+ private static final Set<Class<? extends Statement>>
INVALID_STATEMENT_CLASSES = new HashSet<>();
+
+ static {
+ INVALID_STATEMENT_CLASSES.add(CountDevice.class);
+ INVALID_STATEMENT_CLASSES.add(DescribeTable.class);
+ INVALID_STATEMENT_CLASSES.add(Explain.class);
+ INVALID_STATEMENT_CLASSES.add(ExplainAnalyze.class);
+ INVALID_STATEMENT_CLASSES.add(Query.class);
+ INVALID_STATEMENT_CLASSES.add(ShowAINodes.class);
+ INVALID_STATEMENT_CLASSES.add(ShowCluster.class);
+ INVALID_STATEMENT_CLASSES.add(ShowClusterId.class);
+ INVALID_STATEMENT_CLASSES.add(ShowConfigNodes.class);
+ INVALID_STATEMENT_CLASSES.add(ShowCurrentDatabase.class);
+ INVALID_STATEMENT_CLASSES.add(ShowCurrentSqlDialect.class);
+ INVALID_STATEMENT_CLASSES.add(ShowCurrentTimestamp.class);
+ INVALID_STATEMENT_CLASSES.add(ShowCurrentUser.class);
+ INVALID_STATEMENT_CLASSES.add(ShowDB.class);
+ INVALID_STATEMENT_CLASSES.add(ShowDataNodes.class);
+ INVALID_STATEMENT_CLASSES.add(ShowDevice.class);
+ INVALID_STATEMENT_CLASSES.add(ShowFunctions.class);
+ INVALID_STATEMENT_CLASSES.add(ShowIndex.class);
+ INVALID_STATEMENT_CLASSES.add(ShowPipePlugins.class);
+ INVALID_STATEMENT_CLASSES.add(ShowPipes.class);
+ INVALID_STATEMENT_CLASSES.add(ShowRegions.class);
+ INVALID_STATEMENT_CLASSES.add(ShowTables.class);
+ INVALID_STATEMENT_CLASSES.add(ShowVariables.class);
+ INVALID_STATEMENT_CLASSES.add(ShowVersion.class);
+ }
+
+ public static boolean validateStatement(Statement statement) {
+ return !INVALID_STATEMENT_CLASSES.contains(statement.getClass());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/QueryDataSetHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/QueryDataSetHandler.java
new file mode 100644
index 00000000000..149b99fc028
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/QueryDataSetHandler.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.protocol.rest.model.ExecutionStatus;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.QueryDataSet;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class QueryDataSetHandler {
+
+ private QueryDataSetHandler() {}
+
+ /**
+ * @param actualRowSizeLimit max number of rows to return. no limit when
actualRowSizeLimit <= 0.
+ */
+ public static Response fillQueryDataSet(
+ IQueryExecution queryExecution, Statement statement, int
actualRowSizeLimit)
+ throws IoTDBException {
+ if (statement instanceof Query) {
+ return fillQueryDataSet(queryExecution, actualRowSizeLimit);
+ } else {
+ return fillOtherDataSet(queryExecution, actualRowSizeLimit);
+ }
+ }
+
+ public static Response fillQueryDataSet(
+ IQueryExecution queryExecution, final int actualRowSizeLimit) throws
IoTDBException {
+ QueryDataSet targetDataSet = new QueryDataSet();
+ int fetched = 0;
+ int columnNum = queryExecution.getOutputValueColumnCount();
+
+ DatasetHeader header = queryExecution.getDatasetHeader();
+ List<String> resultColumns = header.getRespColumns();
+ List<TSDataType> dataTypes = header.getRespDataTypes();
+ Map<String, Integer> headerMap = header.getColumnNameIndexMap();
+ for (int i = 0; i < resultColumns.size(); i++) {
+ targetDataSet.addColumnNamesItem(resultColumns.get(i));
+ targetDataSet.addDataTypesItem(dataTypes.get(i).name());
+ targetDataSet.addValuesItem(new ArrayList<>());
+ }
+ while (true) {
+ if (0 < actualRowSizeLimit && actualRowSizeLimit <= fetched) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode())
+ .message(
+ String.format(
+ "Dataset row size exceeded the given max row size
(%d)",
+ actualRowSizeLimit)))
+ .build();
+ }
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) {
+ if (fetched == 0) {
+ targetDataSet.setValues(new ArrayList<>());
+ return Response.ok().entity(targetDataSet).build();
+ }
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+ int currentCount = tsBlock.getPositionCount();
+
+ for (int k = 0; k < resultColumns.size(); k++) {
+ Column column = tsBlock.getColumn(headerMap.get(resultColumns.get(k)));
+ List<Object> targetDataSetColumn = targetDataSet.getValues().get(k);
+ for (int i = 0; i < currentCount; i++) {
+ fetched++;
+ if (column.isNull(i)) {
+ targetDataSetColumn.add(null);
+ } else {
+ targetDataSetColumn.add(
+ column.getDataType().equals(TSDataType.TEXT)
+ ?
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)
+ : column.getObject(i));
+ }
+ }
+ if (k != columnNum - 1) {
+ fetched -= currentCount;
+ }
+ }
+ }
+ targetDataSet.setValues(convertColumnToRow(targetDataSet.getValues()));
+ return Response.ok().entity(targetDataSet).build();
+ }
+
+ private static Response fillOtherDataSet(
+ IQueryExecution queryExecution, final int actualRowSizeLimit) throws
IoTDBException {
+ QueryDataSet targetDataSet = new QueryDataSet();
+ int[] targetDataSetIndexToSourceDataSetIndex =
+ new int[queryExecution.getDatasetHeader().getRespColumns().size()];
+ initTargetDatasetOrderByOrderWithSourceDataSet(
+ queryExecution.getDatasetHeader(),
targetDataSetIndexToSourceDataSetIndex, targetDataSet);
+
+ return fillOtherQueryDataSet(
+ queryExecution, targetDataSetIndexToSourceDataSetIndex,
actualRowSizeLimit, targetDataSet);
+ }
+
+ private static void initTargetDatasetOrderByOrderWithSourceDataSet(
+ DatasetHeader datasetHeader,
+ int[] targetDataSetIndexToSourceDataSetIndex,
+ QueryDataSet targetDataSet) {
+ if (datasetHeader.getRespColumns() != null) {
+ for (int i = 0; i < datasetHeader.getRespColumns().size(); i++) {
+
targetDataSet.addColumnNamesItem(datasetHeader.getRespColumns().get(i));
+ targetDataSet.addValuesItem(new ArrayList<>());
+
targetDataSet.addDataTypesItem(datasetHeader.getRespDataTypes().get(i).name());
+ targetDataSetIndexToSourceDataSetIndex[i] = i;
+ }
+ }
+ }
+
+ private static Response fillOtherQueryDataSet(
+ IQueryExecution queryExecution,
+ int[] targetDataSetIndexToSourceDataSetIndex,
+ int actualRowSizeLimit,
+ QueryDataSet targetDataSet)
+ throws IoTDBException {
+ int fetched = 0;
+ int columnNum = queryExecution.getOutputValueColumnCount();
+ while (true) {
+ if (0 < actualRowSizeLimit && actualRowSizeLimit <= fetched) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode())
+ .message(
+ String.format(
+ "Dataset row size exceeded the given max row size
(%d)",
+ actualRowSizeLimit)))
+ .build();
+ }
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ if (fetched == 0) {
+ targetDataSet.setValues(new ArrayList<>());
+ return Response.ok().entity(targetDataSet).build();
+ }
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+ int currentCount = tsBlock.getPositionCount();
+ if (currentCount == 0) {
+ targetDataSet.setValues(new ArrayList<>());
+ return Response.ok().entity(targetDataSet).build();
+ }
+ for (int k = 0; k < columnNum; k++) {
+ Column column =
tsBlock.getColumn(targetDataSetIndexToSourceDataSetIndex[k]);
+ List<Object> targetDataSetColumn = targetDataSet.getValues().get(k);
+ for (int i = 0; i < currentCount; i++) {
+ fetched++;
+ if (column.isNull(i)) {
+ targetDataSetColumn.add(null);
+ } else {
+ targetDataSetColumn.add(
+ column.getDataType().equals(TSDataType.TEXT)
+ ?
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)
+ : column.getObject(i));
+ }
+ }
+ if (k != columnNum - 1) {
+ fetched -= currentCount;
+ }
+ }
+ }
+ targetDataSet.setValues(convertColumnToRow(targetDataSet.getValues()));
+ return Response.ok().entity(targetDataSet).build();
+ }
+
+ private static List<List<Object>> convertColumnToRow(List<List<Object>>
values) {
+ List<List<Object>> result = new ArrayList<>();
+
+ if (values.isEmpty() || values.get(0).isEmpty()) {
+ return result;
+ }
+
+ int numRows = values.get(0).size();
+ for (int i = 0; i < numRows; i++) {
+ List<Object> newRow = new ArrayList<>();
+ for (List<Object> value : values) {
+ newRow.add(value.get(i));
+ }
+ result.add(newRow);
+ }
+ return result;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/RequestValidationHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/RequestValidationHandler.java
new file mode 100644
index 00000000000..81624ba9031
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/RequestValidationHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.SQL;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RequestValidationHandler {
+
+ private RequestValidationHandler() {}
+
+ public static void validateQuerySQL(SQL sql) {
+ validateSQL(sql);
+ if (sql.getDatabase().isEmpty()) {
+ throw new IllegalArgumentException("database should not be an empty
string");
+ }
+ }
+
+ public static void validateSQL(SQL sql) {
+ Objects.requireNonNull(sql.getSql(), "sql should not be null");
+ Objects.requireNonNull(sql.getDatabase(), "database should not be null");
+ if (sql.getRowLimit() != null) {
+ Validate.isTrue(sql.getRowLimit() > 0, "row_limit should be positive");
+ }
+ }
+
+ public static void validateInsertTabletRequest(InsertTabletRequest
insertTabletRequest) {
+ Objects.requireNonNull(insertTabletRequest.getDatabase(), "database should
not be null");
+ Objects.requireNonNull(insertTabletRequest.getTable(), "table should not
be null");
+ Objects.requireNonNull(insertTabletRequest.getColumnNames(), "column_names
should not be null");
+ Objects.requireNonNull(insertTabletRequest.getColumnTypes(), "column_types
should not be null");
+ Objects.requireNonNull(insertTabletRequest.getDataTypes(), "data_types
should not be null");
+ Objects.requireNonNull(insertTabletRequest.getTimestamps(), "timestamps
should not be null");
+ Objects.requireNonNull(insertTabletRequest.getValues(), "values should not
be null");
+ List<String> errorMessages = new ArrayList<>();
+ String table = insertTabletRequest.getTable();
+ if (insertTabletRequest.getColumnTypes().size() == 0
+ || insertTabletRequest.getColumnTypes().size()
+ != insertTabletRequest.getColumnNames().size()) {
+ errorMessages.add("column_names and column_types should have the same
size");
+ }
+ if (insertTabletRequest.getColumnTypes().size() !=
insertTabletRequest.getDataTypes().size()) {
+ errorMessages.add("column_types and data_types should have the same
size");
+ }
+ if (insertTabletRequest.getTimestamps().size() !=
insertTabletRequest.getValues().size()) {
+ errorMessages.add("values and data_types should have the same size");
+ }
+
+ for (int i = 0; i < insertTabletRequest.getDataTypes().size(); i++) {
+ String dataType = insertTabletRequest.getDataTypes().get(i);
+ if (isDataType(dataType)) {
+ errorMessages.add("The " + dataType + " data type of " + table + " is
illegal");
+ }
+ }
+
+ int dataTypeSize = insertTabletRequest.getDataTypes().size();
+ for (int i = 0; i < insertTabletRequest.getValues().size(); i++) {
+ List<Object> values = insertTabletRequest.getValues().get(i);
+ if (dataTypeSize != values.size()) {
+ errorMessages.add(
+ "The number of values in the " + i + "th row is not equal to the
data_types size");
+ }
+ }
+
+ if (!errorMessages.isEmpty()) {
+ throw new RuntimeException(String.join(",", errorMessages));
+ }
+ }
+
+ private static boolean isDataType(String dataType) {
+ try {
+ TSDataType.valueOf(dataType.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
new file mode 100644
index 00000000000..a9b770915c5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.handler;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+
+public class StatementConstructionHandler {
+
+ private static final DataNodeDevicePathCache DEVICE_PATH_CACHE =
+ DataNodeDevicePathCache.getInstance();
+
+ private StatementConstructionHandler() {}
+
+ public static InsertTabletStatement constructInsertTabletStatement(
+ InsertTabletRequest insertTabletReq)
+ throws IllegalPathException, WriteProcessRejectException {
+ InsertTabletStatement insertStatement = new InsertTabletStatement();
+
insertStatement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(insertTabletReq.getTable()));
+
insertStatement.setMeasurements(insertTabletReq.getColumnNames().toArray(new
String[0]));
+ long[] timestamps =
+
insertTabletReq.getTimestamps().stream().mapToLong(Long::longValue).toArray();
+ if (timestamps.length != 0) {
+
TimestampPrecisionUtils.checkTimestampPrecision(timestamps[timestamps.length -
1]);
+ }
+ insertStatement.setTimes(timestamps);
+ int columnSize = insertTabletReq.getColumnNames().size();
+ int rowSize = insertTabletReq.getTimestamps().size();
+ List<List<Object>> rawData = insertTabletReq.getValues();
+ Object[] columns = new Object[columnSize];
+ BitMap[] bitMaps = new BitMap[columnSize];
+ List<String> rawDataType = insertTabletReq.getDataTypes();
+ TSDataType[] dataTypes = new TSDataType[columnSize];
+
+ for (int i = 0; i < columnSize; i++) {
+ dataTypes[i] =
TSDataType.valueOf(rawDataType.get(i).toUpperCase(Locale.ROOT));
+ }
+
+ for (int columnIndex = 0; columnIndex < columnSize; columnIndex++) {
+ bitMaps[columnIndex] = new BitMap(rowSize);
+ switch (dataTypes[columnIndex]) {
+ case BOOLEAN:
+ boolean[] booleanValues = new boolean[rowSize];
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ Object data = rawData.get(rowIndex).get(columnIndex);
+ if (data == null) {
+ bitMaps[columnIndex].mark(rowIndex);
+ } else {
+ if ("1".equals(data.toString())) {
+ booleanValues[rowIndex] = true;
+ } else if ("0".equals(data.toString())) {
+ booleanValues[rowIndex] = false;
+ } else {
+ booleanValues[rowIndex] = (Boolean) data;
+ }
+ }
+ }
+ columns[columnIndex] = booleanValues;
+ break;
+ case INT32:
+ case DATE:
+ int[] intValues = new int[rowSize];
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ Object object = rawData.get(rowIndex).get(columnIndex);
+ if (object == null) {
+ bitMaps[columnIndex].mark(rowIndex);
+ } else if (object instanceof Integer) {
+ intValues[rowIndex] = (int) object;
+ } else {
+ throw new WriteProcessRejectException(
+ "unsupported data type: " + object.getClass().toString());
+ }
+ }
+ columns[columnIndex] = intValues;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ long[] longValues = new long[rowSize];
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ Object object = rawData.get(rowIndex).get(columnIndex);
+ if (object == null) {
+ bitMaps[columnIndex].mark(rowIndex);
+ } else if (object instanceof Integer) {
+ longValues[rowIndex] = (int) object;
+ } else if (object instanceof Long) {
+ longValues[rowIndex] = (long) object;
+ } else {
+ throw new WriteProcessRejectException(
+ "unsupported data type: " + object.getClass().toString());
+ }
+ }
+ columns[columnIndex] = longValues;
+ break;
+ case FLOAT:
+ float[] floatValues = new float[rowSize];
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ Object data = rawData.get(rowIndex).get(columnIndex);
+ if (data == null) {
+ bitMaps[columnIndex].mark(rowIndex);
+ } else {
+ floatValues[rowIndex] = Float.parseFloat(String.valueOf(data));
+ }
+ }
+ columns[columnIndex] = floatValues;
+ break;
+ case DOUBLE:
+ double[] doubleValues = new double[rowSize];
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ if (rawData.get(rowIndex).get(columnIndex) == null) {
+ bitMaps[columnIndex].mark(rowIndex);
+ } else {
+ doubleValues[rowIndex] =
+
Double.parseDouble(String.valueOf(rawData.get(rowIndex).get(columnIndex)));
+ }
+ }
+ columns[columnIndex] = doubleValues;
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ Binary[] binaryValues = new Binary[rowSize];
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ if (rawData.get(rowIndex).get(columnIndex) == null) {
+ bitMaps[columnIndex].mark(rowIndex);
+ binaryValues[rowIndex] = new
Binary("".getBytes(StandardCharsets.UTF_8));
+ } else {
+ binaryValues[rowIndex] =
+ new Binary(
+ rawData
+ .get(rowIndex)
+ .get(columnIndex)
+ .toString()
+ .getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ columns[columnIndex] = binaryValues;
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid input: " +
rawDataType.get(columnIndex));
+ }
+ }
+ insertStatement.setColumns(columns);
+ insertStatement.setBitMaps(bitMaps);
+ insertStatement.setRowCount(rowSize);
+ insertStatement.setDataTypes(dataTypes);
+ insertStatement.setAligned(false);
+ insertStatement.setWriteToTable(true);
+ TsTableColumnCategory[] columnCategories =
+ new TsTableColumnCategory[insertTabletReq.getColumnTypes().size()];
+ for (int i = 0; i < columnCategories.length; i++) {
+ columnCategories[i] =
+ TsTableColumnCategory.fromTsFileColumnType(
+
Tablet.ColumnCategory.valueOf(insertTabletReq.getColumnTypes().get(i)));
+ }
+ insertStatement.setColumnCategories(columnCategories);
+
+ return insertStatement;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java
new file mode 100644
index 00000000000..f881ef653c4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.table.v1.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
+import org.apache.iotdb.db.protocol.rest.table.v1.NotFoundException;
+import org.apache.iotdb.db.protocol.rest.table.v1.RestApiService;
+import org.apache.iotdb.db.protocol.rest.table.v1.handler.ExceptionHandler;
+import
org.apache.iotdb.db.protocol.rest.table.v1.handler.ExecuteStatementHandler;
+import org.apache.iotdb.db.protocol.rest.table.v1.handler.QueryDataSetHandler;
+import
org.apache.iotdb.db.protocol.rest.table.v1.handler.RequestValidationHandler;
+import
org.apache.iotdb.db.protocol.rest.table.v1.handler.StatementConstructionHandler;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.ExecutionStatus;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
+import org.apache.iotdb.db.protocol.rest.table.v1.model.SQL;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+
+public class RestApiServiceImpl extends RestApiService {
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+
+ private final Integer defaultQueryRowLimit;
+
+ public RestApiServiceImpl() {
+ defaultQueryRowLimit =
+
IoTDBRestServiceDescriptor.getInstance().getConfig().getRestQueryDefaultRowSizeLimit();
+ }
+
+ public Response executeQueryStatement(SQL sql, SecurityContext
securityContext)
+ throws NotFoundException {
+ SqlParser relationSqlParser = new SqlParser();
+ Long queryId = null;
+ Statement statement = null;
+ long startTime = System.nanoTime();
+ try {
+ RequestValidationHandler.validateQuerySQL(sql);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ clientSession.setDatabaseName(sql.getDatabase());
+ clientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
+ statement =
+ relationSqlParser.createStatement(sql.getSql(),
ZoneId.systemDefault(), clientSession);
+ if (statement == null) {
+ return Response.ok()
+ .entity(
+ new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ .code(TSStatusCode.SQL_PARSE_ERROR.getStatusCode())
+ .message("This operation type is not supported"))
+ .build();
+ }
+
+ if (ExecuteStatementHandler.validateStatement(statement)) {
+ return Response.ok()
+ .entity(
+ new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
+ .build();
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId();
+ Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+
+ ExecutionResult result =
+ COORDINATOR.executeForTableModel(
+ statement,
+ relationSqlParser,
+ clientSession,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ sql.getSql(),
+ metadata,
+ config.getQueryTimeoutThreshold(),
+ true);
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(result.status.getCode())
+ .message(result.status.getMessage()))
+ .build();
+ }
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ return QueryDataSetHandler.fillQueryDataSet(
+ queryExecution,
+ statement,
+ sql.getRowLimit() == null ? defaultQueryRowLimit :
sql.getRowLimit());
+ }
+ } catch (Exception e) {
+ return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+ } finally {
+ long costTime = System.nanoTime() - startTime;
+ Optional.ofNullable(statement)
+ .ifPresent(
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT, s.toString(),
costTime));
+ if (queryId != null) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
+ }
+ }
+
+ @Override
+ public Response insertTablet(
+ InsertTabletRequest insertTabletRequest, SecurityContext securityContext)
+ throws NotFoundException {
+ Long queryId = null;
+ long startTime = System.nanoTime();
+ InsertTabletStatement insertTabletStatement = null;
+ try {
+
RequestValidationHandler.validateInsertTabletRequest(insertTabletRequest);
+ insertTabletStatement =
+
StatementConstructionHandler.constructInsertTabletStatement(insertTabletRequest);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ clientSession.setDatabaseName(insertTabletRequest.getDatabase());
+ clientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
+ queryId = SESSION_MANAGER.requestQueryId();
+ Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+
+ SqlParser relationSqlParser = new SqlParser();
+ ExecutionResult result =
+ COORDINATOR.executeForTableModel(
+ insertTabletStatement,
+ relationSqlParser,
+ clientSession,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ "",
+ metadata,
+ config.getQueryTimeoutThreshold());
+
+ return responseGenerateHelper(result);
+ } catch (Exception e) {
+ return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+ } finally {
+ long costTime = System.nanoTime() - startTime;
+ Optional.ofNullable(insertTabletStatement)
+ .ifPresent(
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.INSERT_TABLET, s.getType().name(),
costTime));
+ if (queryId != null) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
+ }
+ }
+
+ @Override
+ public Response executeNonQueryStatement(SQL sql, SecurityContext
securityContext)
+ throws NotFoundException {
+ SqlParser relationSqlParser = new SqlParser();
+ Long queryId = null;
+ Statement statement = null;
+ long startTime = System.nanoTime();
+ try {
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ RequestValidationHandler.validateSQL(sql);
+ clientSession.setDatabaseName(sql.getDatabase());
+ clientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
+ statement =
+ relationSqlParser.createStatement(sql.getSql(),
ZoneId.systemDefault(), clientSession);
+
+ if (statement == null) {
+ return Response.ok()
+ .entity(
+ new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ .code(TSStatusCode.SQL_PARSE_ERROR.getStatusCode())
+ .message("This operation type is not supported"))
+ .build();
+ }
+ if (!ExecuteStatementHandler.validateStatement(statement)) {
+ return Response.ok()
+ .entity(
+ new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
+ .build();
+ }
+ queryId = SESSION_MANAGER.requestQueryId();
+ Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+ ExecutionResult result =
+ COORDINATOR.executeForTableModel(
+ statement,
+ relationSqlParser,
+ clientSession,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ sql.getSql(),
+ metadata,
+ config.getQueryTimeoutThreshold(),
+ false);
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(result.status.getCode())
+ .message(result.status.getMessage()))
+ .build();
+ }
+ return responseGenerateHelper(result);
+ } catch (Exception e) {
+ return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+ } finally {
+ long costTime = System.nanoTime() - startTime;
+ Optional.ofNullable(statement)
+ .ifPresent(
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.EXECUTE_NON_QUERY_PLAN, s.toString(),
costTime));
+ if (queryId != null) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
+ }
+ }
+
+ private Response responseGenerateHelper(ExecutionResult result) {
+ if (result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.status.code ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .message(TSStatusCode.SUCCESS_STATUS.name()))
+ .build();
+ } else if (result.status.code ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ List<TSStatus> subStatus = result.status.getSubStatus();
+ StringBuilder errMsg = new StringBuilder();
+ for (TSStatus status : subStatus) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ errMsg.append(status.getMessage()).append("; ");
+ }
+ }
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ .message(errMsg.toString()))
+ .build();
+ } else {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(result.status.getCode())
+ .message(result.status.getMessage()))
+ .build();
+ }
+ }
+}
diff --git a/iotdb-protocol/openapi/pom.xml b/iotdb-protocol/openapi/pom.xml
index 9b42d36494a..c82eebcc0cf 100644
--- a/iotdb-protocol/openapi/pom.xml
+++ b/iotdb-protocol/openapi/pom.xml
@@ -163,6 +163,33 @@
</configOptions>
</configuration>
</execution>
+ <execution>
+ <id>generate-java-rest-codes-table-v1</id>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <phase>generate-sources</phase>
+ <configuration>
+
<inputSpec>${project.basedir}/src/main/openapi3/iotdb_rest_table_v1.yaml</inputSpec>
+
<output>${project.build.directory}/generated-sources/java</output>
+
<apiPackage>org.apache.iotdb.db.protocol.rest.table.v1</apiPackage>
+
<modelPackage>org.apache.iotdb.db.protocol.rest.table.v1.model</modelPackage>
+
<invokerPackage>org.apache.iotdb.db.protocol.rest.table.v1.invoker</invokerPackage>
+ <generatorName>jaxrs-jersey</generatorName>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-rest-service</artifactId>
+
<artifactVersion>${project.version}</artifactVersion>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <configOptions>
+ <licenseName>Apache License 2.0</licenseName>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-rest-service</artifactId>
+
<artifactVersion>${project.version}</artifactVersion>
+ <dateLibrary>java8</dateLibrary>
+ <useGzipFeature>true</useGzipFeature>
+ </configOptions>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_table_v1.yaml
b/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_table_v1.yaml
new file mode 100644
index 00000000000..a02784d4cbf
--- /dev/null
+++ b/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_table_v1.yaml
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+openapi: 3.0.0
+info:
+ title: iotdb_rest_table_v1
+ description: IoTDB Rest API..
+ license:
+ name: Apache 2.0
+ url: https://www.apache.org/licenses/LICENSE-2.0.html
+ version: 1.0.0
+servers:
+ - url: http://127.0.0.1:18080/
+ description: api
+security:
+ - basic: []
+paths:
+ /rest/table/v1/insertTablet:
+ post:
+ summary: insertTablet
+ description: insertTablet
+ operationId: insertTablet
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/InsertTabletRequest'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+ /rest/table/v1/nonQuery:
+ post:
+ summary: executeNonQueryStatement
+ description: executeNonQueryStatement
+ operationId: executeNonQueryStatement
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/SQL'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+ /rest/table/v1/query:
+ post:
+ summary: executeQueryStatement
+ description: executeQueryStatement
+ operationId: executeQueryStatement
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/SQL'
+ responses:
+ "200":
+ description: QueryDataSet
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/QueryDataSet'
+
+components:
+ schemas:
+ SQL:
+ title: SQL
+ type: object
+ properties:
+ database:
+ type: string
+ sql:
+ type: string
+ row_limit:
+ type: integer
+ format: int32
+
+ InsertTabletRequest:
+ title: InsertTabletRequest
+ type: object
+ properties:
+ timestamps:
+ type: array
+ items:
+ type: integer
+ format: int64
+ column_names:
+ type: array
+ items:
+ type: string
+ column_types:
+ type: array
+ items:
+ type: string
+ data_types:
+ type: array
+ items:
+ type: string
+ values:
+ type: array
+ items:
+ type: array
+ items:
+ type: object
+ table:
+ type: string
+ database:
+ type: string
+
+ ExecutionStatus:
+ type: object
+ properties:
+ code:
+ type: integer
+ format: int32
+ message:
+ type: string
+
+ QueryDataSet:
+ type: object
+ properties:
+ column_names:
+ type: array
+ items:
+ type: string
+ data_types:
+ type: array
+ items:
+ type: string
+ values:
+ type: array
+ items:
+ type: array
+ items:
+ type: object
+ securitySchemes:
+ basic:
+ type: http
+ scheme: basic
+ APIKey:
+ type: apiKey
+ name: API Key
+ in: header