This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch mpp_rest in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a26baa5aa4a2f78aa297a7ccd66eaf5f59c097bc Author: HTHou <[email protected]> AuthorDate: Thu Jun 30 16:43:34 2022 +0800 [IOTDB-3322] Support RestApi for MPP framework --- .../iotdb/db/protocol/mpprest/MPPRestService.java | 162 +++++++++++++ .../protocol/mpprest/filter/ApiOriginFilter.java | 45 ++++ .../mpprest/filter/AuthorizationFilter.java | 138 +++++++++++ .../mpprest/filter/BasicSecurityContext.java | 56 +++++ .../iotdb/db/protocol/mpprest/filter/User.java | 38 ++++ .../db/protocol/mpprest/filter/UserCache.java | 56 +++++ .../mpprest/handler/AuthorizationHandler.java | 47 ++++ .../protocol/mpprest/handler/ExceptionHandler.java | 75 ++++++ .../mpprest/handler/QueryDataSetHandler.java | 252 +++++++++++++++++++++ .../mpprest/handler/RequestValidationHandler.java | 53 +++++ .../handler/StatementConstructionHandler.java | 157 +++++++++++++ .../mpprest/impl/GrafanaApiServiceImpl.java | 245 ++++++++++++++++++++ .../protocol/mpprest/impl/PingApiServiceImpl.java | 51 +++++ .../protocol/mpprest/impl/RestApiServiceImpl.java | 193 ++++++++++++++++ 14 files changed, 1568 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/MPPRestService.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/MPPRestService.java new file mode 100644 index 0000000000..859cfca98f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/MPPRestService.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.protocol.mpprest; + +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.service.IService; +import org.apache.iotdb.commons.service.ServiceType; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceConfig; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor; +import org.apache.iotdb.db.protocol.mpprest.filter.ApiOriginFilter; + +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.DispatcherType; + +import java.util.EnumSet; + +public class MPPRestService implements IService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MPPRestService.class); + + private static Server server; + + private void startSSL( + int port, + String keyStorePath, + String trustStorePath, + String keyStorePwd, + String trustStorePwd, + int idleTime, + boolean clientAuth) { + server = new Server(); + + HttpConfiguration httpsConfig = new HttpConfiguration(); + httpsConfig.setSecurePort(port); + httpsConfig.addCustomizer(new SecureRequestCustomizer()); + + SslContextFactory.Server sslContextFactory = new SslContextFactory.Server(); + sslContextFactory.setKeyStorePath(keyStorePath); + sslContextFactory.setKeyStorePassword(keyStorePwd); + if (clientAuth) { + sslContextFactory.setTrustStorePath(trustStorePath); + sslContextFactory.setTrustStorePassword(trustStorePwd); + sslContextFactory.setNeedClientAuth(clientAuth); + } + + ServerConnector httpsConnector = + new ServerConnector( + server, + new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()), + new HttpConnectionFactory(httpsConfig)); + httpsConnector.setPort(port); + httpsConnector.setIdleTimeout(idleTime); + server.addConnector(httpsConnector); + + server.setHandler(constructServletContextHandler()); + serverStart(); + } + + private void startNonSSL(int port) { + server = new Server(port); + server.setHandler(constructServletContextHandler()); + serverStart(); + } + + private ServletContextHandler constructServletContextHandler() { + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + context.addFilter( + ApiOriginFilter.class, "/*", EnumSet.of(DispatcherType.INCLUDE, DispatcherType.REQUEST)); + ServletHolder holder = context.addServlet(ServletContainer.class, "/*"); + holder.setInitOrder(1); + holder.setInitParameter( + "jersey.config.server.provider.packages", + "io.swagger.jaxrs.listing, io.swagger.sample.resource, org.apache.iotdb.db.protocol.mpprest"); + holder.setInitParameter( + "jersey.config.server.provider.classnames", + "org.glassfish.jersey.media.multipart.MultiPartFeature"); + holder.setInitParameter("jersey.config.server.wadl.disableWadl", "true"); + context.setContextPath("/"); + return context; + } + + private void serverStart() { + try { + server.start(); + } catch (Exception e) { + LOGGER.warn("RestService failed to start: {}", e.getMessage()); + server.destroy(); + } + LOGGER.info("start RestService successfully"); + } + + @Override + public void start() throws StartupException { + IoTDBRestServiceConfig config = IoTDBRestServiceDescriptor.getInstance().getConfig(); + if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableHttps()) { + startSSL( + config.getRestServicePort(), + config.getKeyStorePath(), + config.getTrustStorePath(), + config.getKeyStorePwd(), + config.getTrustStorePwd(), + config.getIdleTimeoutInSeconds(), + config.isClientAuth()); + } else { + startNonSSL(config.getRestServicePort()); + } + } + + @Override + public void stop() { + try { + server.stop(); + } catch (Exception e) { + LOGGER.warn("RestService failed to stop: {}", e.getMessage()); + } finally { + server.destroy(); + } + } + + @Override + public ServiceType getID() { + return ServiceType.REST_SERVICE; + } + + public static MPPRestService getInstance() { + return MPPRestServiceHolder.INSTANCE; + } + + private static class MPPRestServiceHolder { + + private static final MPPRestService INSTANCE = new MPPRestService(); + + private MPPRestServiceHolder() {} + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/ApiOriginFilter.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/ApiOriginFilter.java new file mode 100644 index 0000000000..d90886f85d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/ApiOriginFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.filter; + +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; + +public class ApiOriginFilter implements javax.servlet.Filter { + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + HttpServletResponse res = (HttpServletResponse) response; + res.addHeader("Access-Control-Allow-Origin", "*"); + res.addHeader("Access-Control-Allow-Methods", "GET, POST"); + res.addHeader("Access-Control-Allow-Headers", "*"); + chain.doFilter(request, response); + } + + @Override + public void destroy() {} + + @Override + public void init(FilterConfig filterConfig) throws ServletException {} +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/AuthorizationFilter.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/AuthorizationFilter.java new file mode 100644 index 0000000000..7cb4d8681e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/AuthorizationFilter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.protocol.mpprest.filter; + +import org.apache.iotdb.commons.auth.AuthException; +import org.apache.iotdb.commons.auth.authorizer.IAuthorizer; +import org.apache.iotdb.db.auth.AuthorizerManager; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.glassfish.jersey.internal.util.Base64; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.annotation.WebFilter; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.ext.Provider; + +import java.io.IOException; + +@WebFilter("/*") +@Provider +public class AuthorizationFilter implements ContainerRequestFilter { + + private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizationFilter.class); + + private final IAuthorizer authorizer = AuthorizerManager.getInstance(); + private final UserCache userCache = UserCache.getInstance(); + + public AuthorizationFilter() throws AuthException {} + + @Override + public void filter(ContainerRequestContext containerRequestContext) throws IOException { + if ("OPTIONS".equals(containerRequestContext.getMethod()) + || "swagger.json".equals(containerRequestContext.getUriInfo().getPath()) + || "ping".equals(containerRequestContext.getUriInfo().getPath())) { + return; + } + + String authorizationHeader = containerRequestContext.getHeaderString("authorization"); + if (authorizationHeader == null) { + Response resp = + Response.status(Status.UNAUTHORIZED) + .type(MediaType.APPLICATION_JSON) + .entity( + new ExecutionStatus() + .code(TSStatusCode.UNINITIALIZED_AUTH_ERROR.getStatusCode()) + .message(TSStatusCode.UNINITIALIZED_AUTH_ERROR.name())) + .build(); + containerRequestContext.abortWith(resp); + return; + } + User user = userCache.getUser(authorizationHeader); + if (user == null) { + user = checkLogin(containerRequestContext, authorizationHeader); + if (user == null) { + return; + } else { + userCache.setUser(authorizationHeader, user); + } + } + + BasicSecurityContext basicSecurityContext = + new BasicSecurityContext( + user, IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableHttps()); + containerRequestContext.setSecurityContext(basicSecurityContext); + } + + private User checkLogin( + ContainerRequestContext containerRequestContext, String authorizationHeader) { + + String decoded = Base64.decodeAsString(authorizationHeader.replace("Basic ", "")); + // todo: support special chars in username and password + String[] split = decoded.split(":"); + if (split.length != 2) { + Response resp = + Response.status(Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity( + new ExecutionStatus() + .code(TSStatusCode.SYSTEM_CHECK_ERROR.getStatusCode()) + .message(TSStatusCode.SYSTEM_CHECK_ERROR.name())) + .build(); + containerRequestContext.abortWith(resp); + return null; + } + + User user = new User(); + user.setUsername(split[0]); + user.setPassword(split[1]); + try { + if (!authorizer.login(split[0], split[1])) { + Response resp = + Response.status(Status.UNAUTHORIZED) + .type(MediaType.APPLICATION_JSON) + .entity( + new ExecutionStatus() + .code(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode()) + .message(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.name())) + .build(); + containerRequestContext.abortWith(resp); + return null; + } + } catch (AuthException e) { + LOGGER.warn(e.getMessage(), e); + Response resp = + Response.status(Status.INTERNAL_SERVER_ERROR) + .type(MediaType.APPLICATION_JSON) + .entity( + new ExecutionStatus() + .code(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) + .message(e.getMessage())) + .build(); + containerRequestContext.abortWith(resp); + return null; + } + return user; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/BasicSecurityContext.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/BasicSecurityContext.java new file mode 100644 index 0000000000..7c6b3ca990 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/BasicSecurityContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.protocol.mpprest.filter; + +import javax.ws.rs.core.SecurityContext; + +import java.security.Principal; + +public class BasicSecurityContext implements SecurityContext { + + private final User user; + private final boolean secure; + + public BasicSecurityContext(User user, boolean secure) { + this.user = user; + this.secure = secure; + } + + @Override + public Principal getUserPrincipal() { + return user::getUsername; + } + + public User getUser() { + return user; + } + + @Override + public boolean isUserInRole(String role) { + return true; + } + + @Override + public boolean isSecure() { + return secure; + } + + @Override + public String getAuthenticationScheme() { + return BASIC_AUTH; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/User.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/User.java new file mode 100644 index 0000000000..3d5bd6594c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/User.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.protocol.mpprest.filter; + +public class User { + private String username; + private String password; + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/UserCache.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/UserCache.java new file mode 100644 index 0000000000..80022848bb --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/filter/UserCache.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.db.protocol.mpprest.filter; + +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceConfig; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import java.util.concurrent.TimeUnit; + +public class UserCache { + private static final IoTDBRestServiceConfig CONFIG = + IoTDBRestServiceDescriptor.getInstance().getConfig(); + private final Cache<String, User> cache; + + private UserCache() { + cache = + Caffeine.newBuilder() + .initialCapacity(CONFIG.getCacheInitNum()) + .maximumSize(CONFIG.getCacheMaxNum()) + .expireAfterWrite(CONFIG.getCacheExpireInSeconds(), TimeUnit.SECONDS) + .build(); + } + + public static UserCache getInstance() { + return UserCacheHolder.INSTANCE; + } + + public User getUser(String key) { + return cache.getIfPresent(key); + } + + public void setUser(String key, User user) { + cache.put(key, user); + } + + private static class UserCacheHolder { + private static final UserCache INSTANCE = new UserCache(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/AuthorizationHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/AuthorizationHandler.java new file mode 100644 index 0000000000..d5ee9d2e80 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/AuthorizationHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.handler; + +import org.apache.iotdb.commons.auth.AuthException; +import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.rpc.TSStatusCode; + +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; + +public class AuthorizationHandler { + + public Response checkAuthority(SecurityContext securityContext, Statement statement) { + String userName = securityContext.getUserPrincipal().getName(); + try { + if (!AuthorityChecker.checkAuthorization(statement, userName)) { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.NO_PERMISSION_ERROR.getStatusCode()) + .message(TSStatusCode.NO_PERMISSION_ERROR.name())) + .build(); + } + } catch (AuthException e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/ExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/ExceptionHandler.java new file mode 100644 index 0000000000..40e749d0d1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/ExceptionHandler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.handler; + +import org.apache.iotdb.commons.auth.AuthException; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response.Status; + +import java.io.IOException; + +public class ExceptionHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionHandler.class); + + private ExceptionHandler() {} + + public static ExecutionStatus tryCatchException(Exception e) { + ExecutionStatus responseResult = new ExecutionStatus(); + if (e instanceof QueryProcessException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(((QueryProcessException) e).getErrorCode()); + } else if (e instanceof StorageGroupNotSetException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(((StorageGroupNotSetException) e).getErrorCode()); + } else if (e instanceof StorageEngineException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(((StorageEngineException) e).getErrorCode()); + } else if (e instanceof AuthException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(Status.BAD_REQUEST.getStatusCode()); + } else if (e instanceof IllegalPathException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(((IllegalPathException) e).getErrorCode()); + } else if (e instanceof MetadataException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(((MetadataException) e).getErrorCode()); + } else if (e instanceof IoTDBException) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(((IoTDBException) e).getErrorCode()); + } else if (!(e instanceof IOException) && !(e instanceof NullPointerException)) { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + } else { + responseResult.setMessage(e.getMessage()); + responseResult.setCode(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + LOGGER.warn(e.getMessage(), e); + return responseResult; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java new file mode 100644 index 0000000000..a75385aa49 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.handler; + +import org.apache.iotdb.db.mpp.common.header.DatasetHeader; +import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import javax.ws.rs.core.Response; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class QueryDataSetHandler { + + private QueryDataSetHandler() {} + + /** + * @param actualRowSizeLimit max number of rows to return. no limit when actualRowSizeLimit <= 0. + */ + public static Response fillQueryDataSet( + IQueryExecution queryExecution, Statement statement, int actualRowSizeLimit) + throws IOException { + if (statement instanceof ShowStatement) { + return fillShowPlanDataSet(queryExecution, actualRowSizeLimit); + } else if (statement instanceof QueryStatement) { + if (((QueryStatement) statement).isAggregationQuery()) { + return fillAggregationPlanDataSet(queryExecution, actualRowSizeLimit); + } + return fillDataSetWithTimestamps(queryExecution, actualRowSizeLimit, 1); + } + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode()) + .message( + String.format( + "unsupported query data type: %s", statement.getType().toString()))) + .build(); + } + + public static Response fillDataSetWithTimestamps( + IQueryExecution queryExecution, final int actualRowSizeLimit, final long timePrecision) { + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet = + new org.apache.iotdb.db.protocol.rest.model.QueryDataSet(); + DatasetHeader header = queryExecution.getDatasetHeader(); + List<String> resultColumns = header.getRespColumns(); + for (String resultColumn : resultColumns) { + targetDataSet.addExpressionsItem(resultColumn); + targetDataSet.addValuesItem(new ArrayList<>()); + } + return fillQueryDataSetWithTimestamps( + queryExecution, actualRowSizeLimit, targetDataSet, timePrecision); + } + + public static Response fillAggregationPlanDataSet( + IQueryExecution queryExecution, final int actualRowSizeLimit) { + + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet = + new org.apache.iotdb.db.protocol.rest.model.QueryDataSet(); + + DatasetHeader datasetHeader = queryExecution.getDatasetHeader(); + + for (int i = 0; i < datasetHeader.getRespColumns().size(); i++) { + targetDataSet.addColumnNamesItem(datasetHeader.getRespColumns().get(i)); + targetDataSet.addValuesItem(new ArrayList<>()); + } + + return fillQueryDataSetWithoutTimestamps(queryExecution, actualRowSizeLimit, targetDataSet); + } + + private static Response fillShowPlanDataSet( + IQueryExecution queryExecution, final int actualRowSizeLimit) throws IOException { + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet = + new org.apache.iotdb.db.protocol.rest.model.QueryDataSet(); + initTargetDatasetOrderByOrderWithSourceDataSet( + queryExecution.getDatasetHeader(), targetDataSet); + + return fillQueryDataSetWithoutTimestamps(queryExecution, actualRowSizeLimit, targetDataSet); + } + + private static void initTargetDatasetOrderByOrderWithSourceDataSet( + DatasetHeader datasetHeader, + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) { + if (datasetHeader.getRespColumns() != null) { + for (int i = 0; i < datasetHeader.getRespColumns().size(); i++) { + targetDataSet.addColumnNamesItem(datasetHeader.getRespColumns().get(i)); + targetDataSet.addValuesItem(new ArrayList<>()); + } + } + } + + private static void initTargetDatasetExpByOrderWithSourceDataSet( + QueryDataSet sourceDataSet, + int[] targetDataSetIndexToSourceDataSetIndex, + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) { + if (sourceDataSet.getPaths() != null) { + for (int i = 0; i < sourceDataSet.getPaths().size(); i++) { + Path path = sourceDataSet.getPaths().get(i); + targetDataSet.addExpressionsItem(path.getFullPath()); + targetDataSet.addValuesItem(new ArrayList<>()); + targetDataSetIndexToSourceDataSetIndex[i] = i; + } + } + } + + private static Response fillQueryDataSetWithTimestamps( + IQueryExecution queryExecution, + int actualRowSizeLimit, + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet, + final long timePrecision) { + int fetched = 0; + int columnNum = queryExecution.getOutputValueColumnCount(); + while (fetched < actualRowSizeLimit) { + Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent()) { + break; + } + TsBlock tsBlock = optionalTsBlock.get(); + int currentCount = tsBlock.getPositionCount(); + // time column + for (int i = 0; i < currentCount; i++) { + targetDataSet.addTimestampsItem( + timePrecision == 1 + ? tsBlock.getTimeByIndex(i) + : tsBlock.getTimeByIndex(i) / timePrecision); + } + for (int k = 0; k < columnNum; k++) { + Column column = tsBlock.getColumn(k); + List<Object> targetDataSetColumn = targetDataSet.getValues().get(k); + for (int i = 0; i < currentCount; i++) { + fetched++; + if (column.isNull(i)) { + targetDataSetColumn.add(null); + } else { + targetDataSetColumn.add( + column.getDataType().equals(TSDataType.TEXT) + ? column.getBinary(i).getStringValue() + : column.getObject(i)); + } + } + if (k != columnNum - 1) { + fetched -= currentCount; + } + } + } + return Response.ok().entity(targetDataSet).build(); + } + + private static Response fillQueryDataSetWithoutTimestamps( + IQueryExecution queryExecution, + int actualRowSizeLimit, + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) { + int fetched = 0; + int columnNum = queryExecution.getOutputValueColumnCount(); + while (fetched < actualRowSizeLimit) { + Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent()) { + break; + } + TsBlock tsBlock = optionalTsBlock.get(); + int currentCount = tsBlock.getPositionCount(); + for (int k = 0; k < columnNum; k++) { + Column column = tsBlock.getColumn(k); + List<Object> targetDataSetColumn = targetDataSet.getValues().get(k); + for (int i = 0; i < currentCount; i++) { + fetched++; + if (column.isNull(i)) { + targetDataSetColumn.add(null); + } else { + targetDataSetColumn.add( + column.getDataType().equals(TSDataType.TEXT) + ? column.getBinary(i).getStringValue() + : column.getObject(i)); + } + } + if (k != columnNum - 1) { + fetched -= currentCount; + } + } + } + return Response.ok().entity(targetDataSet).build(); + } + + public static Response fillGrafanaVariablesResult( + IQueryExecution queryExecution, Statement statement) { + List<String> results = new ArrayList<>(); + Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent()) { + return Response.ok().entity(results).build(); + } + TsBlock tsBlock = optionalTsBlock.get(); + int currentCount = tsBlock.getPositionCount(); + Column column = tsBlock.getColumn(0); + + for (int i = 0; i < currentCount; i++) { + String nodePaths = column.getObject(i).toString(); + if (statement instanceof ShowChildPathsStatement) { + String[] nodeSubPath = nodePaths.split("\\."); + results.add(nodeSubPath[nodeSubPath.length - 1]); + } else { + results.add(nodePaths); + } + } + return Response.ok().entity(results).build(); + } + + public static Response fillGrafanaNodesResult(IQueryExecution queryExecution) throws IOException { + List<String> nodes = new ArrayList<>(); + Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent()) { + return Response.ok().entity(nodes).build(); + } + TsBlock tsBlock = optionalTsBlock.get(); + int currentCount = tsBlock.getPositionCount(); + Column column = tsBlock.getColumn(0); + + for (int i = 0; i < currentCount; i++) { + String nodePaths = column.getObject(i).toString(); + String[] nodeSubPath = nodePaths.split("\\."); + nodes.add(nodeSubPath[nodeSubPath.length - 1]); + } + return Response.ok().entity(nodes).build(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/RequestValidationHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/RequestValidationHandler.java new file mode 100644 index 0000000000..ca99130465 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/RequestValidationHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.handler; + +import org.apache.iotdb.db.protocol.mpprest.model.ExpressionRequest; +import org.apache.iotdb.db.protocol.mpprest.model.InsertTabletRequest; +import org.apache.iotdb.db.protocol.mpprest.model.SQL; + +import org.apache.commons.lang3.Validate; + +import java.util.Objects; + +public class RequestValidationHandler { + + private RequestValidationHandler() {} + + public static void validateSQL(SQL sql) { + Objects.requireNonNull(sql.getSql(), "sql should not be null"); + if (sql.getRowLimit() != null) { + Validate.isTrue(sql.getRowLimit() > 0, "rowLimit should be positive"); + } + } + + public static void validateInsertTabletRequest(InsertTabletRequest insertTabletRequest) { + Objects.requireNonNull(insertTabletRequest.getTimestamps(), "timestamps should not be null"); + Objects.requireNonNull(insertTabletRequest.getIsAligned(), "isAligned should not be null"); + Objects.requireNonNull(insertTabletRequest.getDeviceId(), "deviceId should not be null"); + Objects.requireNonNull(insertTabletRequest.getDataTypes(), "dataTypes should not be null"); + Objects.requireNonNull(insertTabletRequest.getValues(), "values should not be null"); + } + + public static void validateExpressionRequest(ExpressionRequest expressionRequest) { + Objects.requireNonNull(expressionRequest.getExpression(), "expression should not be null"); + Objects.requireNonNull(expressionRequest.getPrefixPath(), "prefixPath should not be null"); + Objects.requireNonNull(expressionRequest.getStartTime(), "startTime should not be null"); + Objects.requireNonNull(expressionRequest.getEndTime(), "endTime should not be null"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/StatementConstructionHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/StatementConstructionHandler.java new file mode 100644 index 0000000000..60ebe292ab --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/StatementConstructionHandler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.handler; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.WriteProcessRejectException; +import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.protocol.mpprest.model.InsertTabletRequest; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BitMap; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class StatementConstructionHandler { + private StatementConstructionHandler() {} + + public static InsertTabletStatement constructInsertTabletStatement( + InsertTabletRequest insertTabletRequest) + throws IllegalPathException, WriteProcessRejectException { + // construct insert statement + InsertTabletStatement insertStatement = new InsertTabletStatement(); + insertStatement.setDevicePath(new PartialPath(insertTabletRequest.getDeviceId())); + insertStatement.setMeasurements(insertTabletRequest.getMeasurements().toArray(new String[0])); + List<List<Object>> rawData = insertTabletRequest.getValues(); + List<String> rawDataType = insertTabletRequest.getDataTypes(); + + int rowSize = insertTabletRequest.getTimestamps().size(); + int columnSize = rawDataType.size(); + + Object[] columns = new Object[columnSize]; + BitMap[] bitMaps = new BitMap[columnSize]; + TSDataType[] dataTypes = new TSDataType[columnSize]; + + for (int i = 0; i < columnSize; i++) { + dataTypes[i] = TSDataType.valueOf(rawDataType.get(i)); + } + + for (int columnIndex = 0; columnIndex < columnSize; columnIndex++) { + bitMaps[columnIndex] = new BitMap(rowSize); + switch (dataTypes[columnIndex]) { + case BOOLEAN: + boolean[] booleanValues = new boolean[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + } else { + booleanValues[rowIndex] = (Boolean) rawData.get(columnIndex).get(rowIndex); + } + } + columns[columnIndex] = booleanValues; + break; + case INT32: + int[] intValues = new int[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + Object object = rawData.get(columnIndex).get(rowIndex); + if (object == null) { + bitMaps[columnIndex].mark(rowIndex); + } else if (object instanceof Integer) { + intValues[rowIndex] = (int) object; + } else { + throw new WriteProcessRejectException( + "unsupported data type: " + object.getClass().toString()); + } + } + columns[columnIndex] = intValues; + break; + case INT64: + long[] longValues = new long[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + Object object = rawData.get(columnIndex).get(rowIndex); + if (object == null) { + bitMaps[columnIndex].mark(rowIndex); + } else if (object instanceof Integer) { + longValues[rowIndex] = (int) object; + } else if (object instanceof Long) { + longValues[rowIndex] = (long) object; + } else { + throw new WriteProcessRejectException( + "unsupported data type: " + object.getClass().toString()); + } + } + columns[columnIndex] = longValues; + break; + case FLOAT: + float[] floatValues = new float[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + } else { + floatValues[rowIndex] = + ((Double) rawData.get(columnIndex).get(rowIndex)).floatValue(); + } + } + columns[columnIndex] = floatValues; + break; + case DOUBLE: + double[] doubleValues = new double[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + } else { + doubleValues[rowIndex] = (double) rawData.get(columnIndex).get(rowIndex); + } + } + columns[columnIndex] = doubleValues; + break; + case TEXT: + Binary[] binaryValues = new Binary[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + binaryValues[rowIndex] = new Binary("".getBytes(StandardCharsets.UTF_8)); + } else { + binaryValues[rowIndex] = + new Binary( + rawData + .get(columnIndex) + .get(rowIndex) + .toString() + .getBytes(StandardCharsets.UTF_8)); + } + } + columns[columnIndex] = binaryValues; + break; + default: + throw new IllegalArgumentException("Invalid input: " + rawDataType.get(columnIndex)); + } + } + + insertStatement.setTimes( + insertTabletRequest.getTimestamps().stream().mapToLong(Long::longValue).toArray()); + insertStatement.setColumns(columns); + insertStatement.setBitMaps(bitMaps); + insertStatement.setRowCount(insertTabletRequest.getTimestamps().size()); + insertStatement.setDataTypes(dataTypes); + insertStatement.setAligned(insertTabletRequest.getIsAligned()); + return insertStatement; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java new file mode 100644 index 0000000000..3ab3779fc4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.impl; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.plan.Coordinator; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; +import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; +import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement; +import org.apache.iotdb.db.protocol.mpprest.GrafanaApiService; +import org.apache.iotdb.db.protocol.mpprest.NotFoundException; +import org.apache.iotdb.db.protocol.mpprest.handler.AuthorizationHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.ExceptionHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.QueryDataSetHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.RequestValidationHandler; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.db.protocol.mpprest.model.ExpressionRequest; +import org.apache.iotdb.db.protocol.mpprest.model.SQL; +import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.base.Joiner; +import io.airlift.concurrent.SetThreadName; +import org.apache.commons.lang3.StringUtils; + +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; + +import java.time.ZoneId; +import java.util.List; + +public class GrafanaApiServiceImpl extends GrafanaApiService { + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + private static final Coordinator COORDINATOR = Coordinator.getInstance(); + + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + + private final IPartitionFetcher PARTITION_FETCHER; + + private final ISchemaFetcher SCHEMA_FETCHER; + private final AuthorizationHandler authorizationHandler; + + private final long timePrecision; // the default timestamp precision is ms + + public GrafanaApiServiceImpl() { + if (config.isClusterMode()) { + PARTITION_FETCHER = ClusterPartitionFetcher.getInstance(); + SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance(); + } else { + PARTITION_FETCHER = StandalonePartitionFetcher.getInstance(); + SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance(); + } + authorizationHandler = new AuthorizationHandler(); + + switch (IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()) { + case "ns": + timePrecision = 1000000; + break; + case "us": + timePrecision = 1000; + break; + default: + timePrecision = 1; + } + } + + @Override + public Response variables(SQL sql, SecurityContext securityContext) { + try { + RequestValidationHandler.validateSQL(sql); + + Statement statement = + StatementGenerator.createStatement(sql.getSql(), ZoneId.systemDefault()); + if (!(statement instanceof ShowStatement) && !(statement instanceof QueryStatement)) { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } + + Response response = authorizationHandler.checkAuthority(securityContext, statement); + if (response != null) { + return response; + } + + final long queryId = SESSION_MANAGER.requestQueryId(true); + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute( + statement, queryId, null, sql.getSql(), PARTITION_FETCHER, SCHEMA_FETCHER); + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + return QueryDataSetHandler.fillGrafanaVariablesResult(queryExecution, statement); + } + } catch (Exception e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + } + + @Override + public Response expression(ExpressionRequest expressionRequest, SecurityContext securityContext) + throws NotFoundException { + try { + RequestValidationHandler.validateExpressionRequest(expressionRequest); + + final String expression = Joiner.on(",").join(expressionRequest.getExpression()); + final String prefixPaths = Joiner.on(",").join(expressionRequest.getPrefixPath()); + final long startTime = + (long) (expressionRequest.getStartTime().doubleValue() * timePrecision); + final long endTime = (long) (expressionRequest.getEndTime().doubleValue() * timePrecision); + String sql = + "select " + + expression + + " from " + + prefixPaths + + " where timestamp>=" + + startTime + + " and timestamp<= " + + endTime; + if (StringUtils.isNotEmpty(expressionRequest.getCondition())) { + sql += " and " + expressionRequest.getCondition(); + } + if (StringUtils.isNotEmpty(expressionRequest.getControl())) { + sql += " " + expressionRequest.getControl(); + } + + Statement statement = StatementGenerator.createStatement(sql, ZoneId.systemDefault()); + + Response response = authorizationHandler.checkAuthority(securityContext, statement); + if (response != null) { + return response; + } + + final long queryId = SESSION_MANAGER.requestQueryId(true); + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute(statement, queryId, null, sql, PARTITION_FETCHER, SCHEMA_FETCHER); + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + if (((QueryStatement) statement).isGroupByLevel()) { + return QueryDataSetHandler.fillAggregationPlanDataSet(queryExecution, 0); + } else { + return QueryDataSetHandler.fillDataSetWithTimestamps(queryExecution, 0, timePrecision); + } + } + } catch (Exception e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + } + + @Override + public Response login(SecurityContext securityContext) throws NotFoundException { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .message(TSStatusCode.SUCCESS_STATUS.name())) + .build(); + } + + @Override + public Response node(List<String> requestBody, SecurityContext securityContext) + throws NotFoundException { + try { + if (requestBody != null && requestBody.size() > 0) { + PartialPath path = new PartialPath(Joiner.on(".").join(requestBody)); + String sql = "show child paths " + path; + Statement statement = StatementGenerator.createStatement(sql, ZoneId.systemDefault()); + + Response response = authorizationHandler.checkAuthority(securityContext, statement); + if (response != null) { + return response; + } + + final long queryId = SESSION_MANAGER.requestQueryId(true); + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute(statement, queryId, null, sql, PARTITION_FETCHER, SCHEMA_FETCHER); + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + return QueryDataSetHandler.fillGrafanaNodesResult(queryExecution); + } + } else { + return QueryDataSetHandler.fillGrafanaNodesResult(null); + } + } catch (Exception e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/PingApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/PingApiServiceImpl.java new file mode 100644 index 0000000000..0c970a45c4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/PingApiServiceImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.impl; + +import org.apache.iotdb.commons.service.ThriftService; +import org.apache.iotdb.db.protocol.mpprest.PingApiService; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.db.service.RPCService; +import org.apache.iotdb.rpc.TSStatusCode; + +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; + +public class PingApiServiceImpl extends PingApiService { + + private static final String UNAVAILABLE_SERVICE = "thrift service is unavailable"; + + @Override + public Response tryPing(SecurityContext securityContext) { + if (RPCService.getInstance().getRPCServiceStatus().equals(ThriftService.STATUS_DOWN)) { + return Response.status(Response.Status.SERVICE_UNAVAILABLE) + .entity( + new ExecutionStatus() + .code(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) + .message(UNAVAILABLE_SERVICE)) + .build(); + } + + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .message(TSStatusCode.SUCCESS_STATUS.name())) + .build(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java new file mode 100644 index 0000000000..f0fa2b0c04 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mpprest.impl; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor; +import org.apache.iotdb.db.mpp.plan.Coordinator; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; +import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; +import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.protocol.mpprest.RestApiService; +import org.apache.iotdb.db.protocol.mpprest.handler.AuthorizationHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.ExceptionHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.QueryDataSetHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.RequestValidationHandler; +import org.apache.iotdb.db.protocol.mpprest.handler.StatementConstructionHandler; +import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; +import org.apache.iotdb.db.protocol.mpprest.model.InsertTabletRequest; +import org.apache.iotdb.db.protocol.mpprest.model.SQL; +import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.rpc.TSStatusCode; + +import io.airlift.concurrent.SetThreadName; + +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; + +import java.time.ZoneId; + +public class RestApiServiceImpl extends RestApiService { + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + private static final Coordinator COORDINATOR = Coordinator.getInstance(); + + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + + private final IPartitionFetcher PARTITION_FETCHER; + + private final ISchemaFetcher SCHEMA_FETCHER; + private final AuthorizationHandler authorizationHandler; + + private final Integer defaultQueryRowLimit; + + public RestApiServiceImpl() { + if (config.isClusterMode()) { + PARTITION_FETCHER = ClusterPartitionFetcher.getInstance(); + SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance(); + } else { + PARTITION_FETCHER = StandalonePartitionFetcher.getInstance(); + SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance(); + } + authorizationHandler = new AuthorizationHandler(); + defaultQueryRowLimit = + IoTDBRestServiceDescriptor.getInstance().getConfig().getRestQueryDefaultRowSizeLimit(); + } + + @Override + public Response executeNonQueryStatement(SQL sql, SecurityContext securityContext) { + try { + RequestValidationHandler.validateSQL(sql); + + Statement statement = + StatementGenerator.createStatement(sql.getSql(), ZoneId.systemDefault()); + Response response = authorizationHandler.checkAuthority(securityContext, statement); + if (response != null) { + return response; + } + ExecutionResult result = + COORDINATOR.execute( + statement, + SESSION_MANAGER.requestQueryId(false), + null, + sql.getSql(), + PARTITION_FETCHER, + SCHEMA_FETCHER); + + return Response.ok() + .entity( + result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? new ExecutionStatus() + .code(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .message(TSStatusCode.SUCCESS_STATUS.name()) + : new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } catch (Exception e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + } + + @Override + public Response executeQueryStatement(SQL sql, SecurityContext securityContext) { + try { + RequestValidationHandler.validateSQL(sql); + + Statement statement = + StatementGenerator.createStatement(sql.getSql(), ZoneId.systemDefault()); + Response response = authorizationHandler.checkAuthority(securityContext, statement); + if (response != null) { + return response; + } + + final long queryId = SESSION_MANAGER.requestQueryId(true); + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute( + statement, queryId, null, sql.getSql(), PARTITION_FETCHER, SCHEMA_FETCHER); + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Response.ok() + .entity( + new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + return QueryDataSetHandler.fillQueryDataSet( + queryExecution, + statement, + sql.getRowLimit() == null ? defaultQueryRowLimit : sql.getRowLimit()); + } + } catch (Exception e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + } + + @Override + public Response insertTablet( + InsertTabletRequest insertTabletRequest, SecurityContext securityContext) { + try { + RequestValidationHandler.validateInsertTabletRequest(insertTabletRequest); + + InsertTabletStatement insertTabletStatement = + StatementConstructionHandler.constructInsertTabletStatement(insertTabletRequest); + + Response response = + authorizationHandler.checkAuthority(securityContext, insertTabletStatement); + if (response != null) { + return response; + } + + ExecutionResult result = + COORDINATOR.execute( + insertTabletStatement, + SESSION_MANAGER.requestQueryId(false), + null, + "", + PARTITION_FETCHER, + SCHEMA_FETCHER); + + return Response.ok() + .entity( + result.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? new ExecutionStatus() + .code(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .message(TSStatusCode.SUCCESS_STATUS.name()) + : new ExecutionStatus() + .code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name())) + .build(); + } catch (Exception e) { + return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); + } + } +}
