This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d0584cd8e69ede4c86328ff197af1557fdd57f32 Author: JackieTien97 <[email protected]> AuthorDate: Sun Apr 7 16:24:35 2024 +0800 support sql_dialect in cli and session --- .../java/org/apache/iotdb/cli/AbstractCli.java | 16 ++ .../src/main/java/org/apache/iotdb/cli/Cli.java | 4 + .../org/apache/iotdb/isession/SessionConfig.java | 2 + .../main/java/org/apache/iotdb/jdbc/Config.java | 2 + .../org/apache/iotdb/jdbc/IoTDBConnection.java | 3 +- .../apache/iotdb/jdbc/IoTDBConnectionParams.java | 10 ++ .../src/main/java/org/apache/iotdb/jdbc/Utils.java | 5 + .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../java/org/apache/iotdb/session/Session.java | 14 +- .../apache/iotdb/session/SessionConnection.java | 12 +- .../org/apache/iotdb/session/pool/SessionPool.java | 12 ++ .../session/subscription/SubscriptionSession.java | 4 +- .../SubscriptionSessionConnection.java | 10 +- .../iotdb/session/SessionConnectionTest.java | 6 +- .../iotdb/db/protocol/session/IClientSession.java | 25 +++- .../iotdb/db/protocol/session/SessionManager.java | 26 +++- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 164 +++++++++++++++++---- .../iotdb/db/queryengine/common/SessionInfo.java | 18 ++- .../iotdb/db/queryengine/plan/Coordinator.java | 76 ++++++++++ .../execution/config/TableConfigTaskVisitor.java | 99 +++++++++++++ .../config/executor/ClusterConfigTaskExecutor.java | 35 +++++ .../config/executor/IConfigTaskExecutor.java | 18 +++ .../metadata/relational/CreateTableTask.java | 42 ++++++ .../config/metadata/relational/UseDBTask.java | 46 ++++++ .../plan/relational/analyzer/Analyzer.java | 2 +- .../relational/planner/RelationalModelPlanner.java | 66 ++++++++- 26 files changed, 661 insertions(+), 58 deletions(-) diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java index 18bb7fa538d..12a4d311f8a 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java @@ -47,6 +47,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.iotdb.jdbc.Config.SQL_DIALECT; + public abstract class AbstractCli { static final String HOST_ARGS = "h"; @@ -137,6 +139,8 @@ public abstract class AbstractCli { static int lastProcessStatus = CODE_OK; + static String sqlDialect = "tree"; + static void init() { keywordSet.add("-" + HOST_ARGS); keywordSet.add("-" + HELP_ARGS); @@ -238,6 +242,14 @@ public abstract class AbstractCli { + "Using the configuration of server if it's not set (optional)") .build(); options.addOption(queryTimeout); + + Option sqlDialect = + Option.builder(SQL_DIALECT) + .argName(SQL_DIALECT) + .hasArg() + .desc("currently support tree and table, using tree if it's not set (optional)") + .build(); + options.addOption(sqlDialect); return options; } @@ -305,6 +317,10 @@ public abstract class AbstractCli { } } + static void setSqlDialect(String sqlDialect) { + AbstractCli.sqlDialect = sqlDialect; + } + static String[] removePasswordArgs(String[] args) { int index = -1; for (int i = 0; i < args.length; i++) { diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java index bca535ad3bb..221a0b7f9db 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java @@ -109,6 +109,7 @@ public class Cli extends AbstractCli { } info.setProperty("user", username); info.setProperty("password", password); + info.setProperty(Config.SQL_DIALECT, sqlDialect); } private static boolean parseCommandLine( @@ -129,6 +130,9 @@ public class Cli extends AbstractCli { if (commandLine.hasOption(TIMEOUT_ARGS)) { setQueryTimeout(commandLine.getOptionValue(TIMEOUT_ARGS)); } + if (commandLine.hasOption(Config.SQL_DIALECT)) { + setSqlDialect(commandLine.getOptionValue(Config.SQL_DIALECT)); + } } catch (ParseException e) { ctx.getPrinter() .println( diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java index e43b771d3c6..9cc01fd2631 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java @@ -53,5 +53,7 @@ public class SessionConfig { public static final long RETRY_INTERVAL_IN_MS = 500; + public static final String SQL_DIALECT = "tree"; + private SessionConfig() {} } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java index adb0125dbc9..6c8f0f7efa4 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java @@ -77,4 +77,6 @@ public class Config { public static final String TRUST_STORE = "trust_store"; public static final String TRUST_STORE_PWD = "trust_store_pwd"; + + public static final String SQL_DIALECT = "sql_dialect"; } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java index 9f8142fb1b3..e472ef79162 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java @@ -492,7 +492,8 @@ public class IoTDBConnection implements Connection { openReq.setUsername(params.getUsername()); openReq.setPassword(params.getPassword()); openReq.setZoneId(getTimeZone()); - openReq.putToConfiguration("version", params.getVersion().toString()); + openReq.putToConfiguration(Config.VERSION, params.getVersion().toString()); + openReq.putToConfiguration(Config.SQL_DIALECT, params.getSqlDialect()); TSOpenSessionResp openResp = null; try { diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java index d1523691f79..389202536ba 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java @@ -45,6 +45,8 @@ public class IoTDBConnectionParams { private String trustStore; private String trustStorePwd; + private String sqlDialect = "tree"; + public IoTDBConnectionParams(String url) { this.jdbcUriString = url; } @@ -164,4 +166,12 @@ public class IoTDBConnectionParams { public void setTrustStorePwd(String trustStorePwd) { this.trustStorePwd = trustStorePwd; } + + public String getSqlDialect() { + return sqlDialect; + } + + public void setSqlDialect(String sqlDialect) { + this.sqlDialect = sqlDialect; + } } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java index af884d3e781..281c3f10c17 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java @@ -117,6 +117,10 @@ public class Utils { params.setTrustStorePwd(info.getProperty(Config.TRUST_STORE_PWD)); } + if (info.containsKey(Config.SQL_DIALECT)) { + params.setSqlDialect(info.getProperty(Config.SQL_DIALECT)); + } + return params; } @@ -154,6 +158,7 @@ public class Utils { case Config.TRUST_STORE_PWD: case Config.VERSION: case Config.NETWORK_TIMEOUT: + case Config.SQL_DIALECT: info.put(key, value); break; case Config.TIME_ZONE: diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 67c4278c92e..ba6a7f06681 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -38,6 +38,8 @@ public enum TSStatusCode { START_UP_ERROR(203), SHUT_DOWN_ERROR(204), + UNSUPPORTED_SQL_DIALECT(205), + // General Error UNSUPPORTED_OPERATION(300), EXECUTE_STATEMENT_ERROR(301), diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index b87d8937a1e..c808962421e 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -172,6 +172,8 @@ public class Session implements ISession { protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS; + protected String sqlDialect = SessionConfig.SQL_DIALECT; + private static final String REDIRECT_TWICE = "redirect twice"; private static final String REDIRECT_TWICE_RETRY = "redirect twice, please try again."; @@ -434,6 +436,7 @@ public class Session implements ISession { this.enableAutoFetch = builder.enableAutoFetch; this.maxRetryCount = builder.maxRetryCount; this.retryIntervalInMs = builder.retryIntervalInMs; + this.sqlDialect = builder.sqlDialect; } @Override @@ -594,10 +597,10 @@ public class Session implements ISession { Session session, TEndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException { if (endpoint == null) { return new SessionConnection( - session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); + session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect); } return new SessionConnection( - session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); + session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect); } @Override @@ -3805,6 +3808,8 @@ public class Session implements ISession { private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS; + private String sqlDialect = SessionConfig.SQL_DIALECT; + public Builder useSSL(boolean useSSL) { this.useSSL = useSSL; return this; @@ -3902,6 +3907,11 @@ public class Session implements ISession { return this; } + public Builder sqlDialect(String sqlDialect) { + this.sqlDialect = sqlDialect; + return this; + } + public Session build() { if (nodeUrls != null && (!SessionConfig.DEFAULT_HOST.equals(host) || rpcPort != SessionConfig.DEFAULT_PORT)) { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 993626762d0..0ab41f30617 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -105,11 +105,14 @@ public class SessionConnection { private final long retryIntervalInMs; + private final String sqlDialect; + // TestOnly public SessionConnection() { availableNodes = Collections::emptyList; this.maxRetryCount = Math.max(0, SessionConfig.MAX_RETRY_COUNT); this.retryIntervalInMs = Math.max(0, SessionConfig.RETRY_INTERVAL_IN_MS); + this.sqlDialect = "tree"; } public SessionConnection( @@ -118,7 +121,8 @@ public class SessionConnection { ZoneId zoneId, Supplier<List<TEndPoint>> availableNodes, int maxRetryCount, - long retryIntervalInMs) + long retryIntervalInMs, + String sqlDialect) throws IoTDBConnectionException { this.session = session; this.endPoint = endPoint; @@ -127,6 +131,7 @@ public class SessionConnection { this.availableNodes = availableNodes; this.maxRetryCount = Math.max(0, maxRetryCount); this.retryIntervalInMs = Math.max(0, retryIntervalInMs); + this.sqlDialect = sqlDialect; try { init(endPoint, session.useSSL, session.trustStore, session.trustStorePwd); } catch (StatementExecutionException e) { @@ -141,7 +146,8 @@ public class SessionConnection { ZoneId zoneId, Supplier<List<TEndPoint>> availableNodes, int maxRetryCount, - long retryIntervalInMs) + long retryIntervalInMs, + String sqlDialect) throws IoTDBConnectionException { this.session = session; this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId; @@ -149,6 +155,7 @@ public class SessionConnection { this.availableNodes = availableNodes; this.maxRetryCount = Math.max(0, maxRetryCount); this.retryIntervalInMs = Math.max(0, retryIntervalInMs); + this.sqlDialect = sqlDialect; initClusterConn(); } @@ -190,6 +197,7 @@ public class SessionConnection { openReq.setPassword(session.password); openReq.setZoneId(zoneId.toString()); openReq.putToConfiguration("version", session.version.toString()); + openReq.putToConfiguration("sql_dialect", sqlDialect); try { TSOpenSessionResp openResp = client.openSession(openReq); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 6db8113d7ba..92d0f10fbc0 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -153,6 +153,8 @@ public class SessionPool implements ISessionPool { protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS; + protected String sqlDialect = SessionConfig.SQL_DIALECT; + private static final String INSERT_RECORD_FAIL = "insertRecord failed"; private static final String INSERT_RECORD_ERROR_MSG = "unexpected error in insertRecord"; @@ -487,6 +489,7 @@ public class SessionPool implements ISessionPool { this.trustStorePwd = builder.trustStorePwd; this.maxRetryCount = builder.maxRetryCount; this.retryIntervalInMs = builder.retryIntervalInMs; + this.sqlDialect = builder.sqlDialect; if (enableAutoFetch) { initThreadPool(); @@ -541,6 +544,7 @@ public class SessionPool implements ISessionPool { .trustStorePwd(trustStorePwd) .maxRetryCount(maxRetryCount) .retryIntervalInMs(retryIntervalInMs) + .sqlDialect(sqlDialect) .build(); } else { // Construct redirect-able Session @@ -561,6 +565,7 @@ public class SessionPool implements ISessionPool { .trustStorePwd(trustStorePwd) .maxRetryCount(maxRetryCount) .retryIntervalInMs(retryIntervalInMs) + .sqlDialect(sqlDialect) .build(); } session.setEnableQueryRedirection(enableQueryRedirection); @@ -3524,6 +3529,8 @@ public class SessionPool implements ISessionPool { private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS; + private String sqlDialect = SessionConfig.SQL_DIALECT; + public Builder useSSL(boolean useSSL) { this.useSSL = useSSL; return this; @@ -3634,6 +3641,11 @@ public class SessionPool implements ISessionPool { return this; } + public Builder sqlDialect(String sqlDialect) { + this.sqlDialect = sqlDialect; + return this; + } + public SessionPool build() { return new SessionPool(this); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java index 8346277e2b5..15611a1c5ce 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java @@ -63,10 +63,10 @@ public class SubscriptionSession extends Session { Session session, TEndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException { if (endpoint == null) { return new SubscriptionSessionConnection( - session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); + session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect); } return new SubscriptionSessionConnection( - session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); + session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect); } /////////////////////////////// topic /////////////////////////////// diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java index b896e7cb189..6021caae8b5 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java @@ -65,9 +65,10 @@ public class SubscriptionSessionConnection extends SessionConnection { ZoneId zoneId, Supplier<List<TEndPoint>> availableNodes, int maxRetryCount, - long retryIntervalInMs) + long retryIntervalInMs, + String sqlDialect) throws IoTDBConnectionException { - super(session, endPoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); + super(session, endPoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect); } public SubscriptionSessionConnection( @@ -75,9 +76,10 @@ public class SubscriptionSessionConnection extends SessionConnection { ZoneId zoneId, Supplier<List<TEndPoint>> availableNodes, int maxRetryCount, - long retryIntervalInMs) + long retryIntervalInMs, + String sqlDialect) throws IoTDBConnectionException { - super(session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); + super(session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect); } // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java index 61a5abd3865..f4be6898d2f 100644 --- a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java +++ b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java @@ -172,7 +172,8 @@ public class SessionConnectionTest { ZoneId.systemDefault(), () -> Collections.singletonList(new TEndPoint("local", 12)), SessionConfig.MAX_RETRY_COUNT, - SessionConfig.RETRY_INTERVAL_IN_MS); + SessionConfig.RETRY_INTERVAL_IN_MS, + "tree"); } @Test(expected = IoTDBConnectionException.class) @@ -192,7 +193,8 @@ public class SessionConnectionTest { ZoneId.systemDefault(), () -> Collections.singletonList(new TEndPoint("local", 12)), SessionConfig.MAX_RETRY_COUNT, - SessionConfig.RETRY_INTERVAL_IN_MS); + SessionConfig.RETRY_INTERVAL_IN_MS, + "tree"); } @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java index ab72b092f2a..ff3040a34ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; +import javax.annotation.Nullable; + import java.time.ZoneId; import java.util.Set; import java.util.TimeZone; @@ -43,6 +45,10 @@ public abstract class IClientSession { private long logInTime; + private SqlDialect sqlDialect = SqlDialect.TREE; + + @Nullable private String databaseName; + public abstract String getClientAddress(); abstract int getClientPort(); @@ -135,11 +141,24 @@ public abstract class IClientSession { public abstract void removeQueryId(Long statementId, Long queryId); - public Model getModel() { - return Model.TABLE; + public SqlDialect getSqlDialect() { + return sqlDialect; + } + + public void setSqlDialect(SqlDialect sqlDialect) { + this.sqlDialect = sqlDialect; + } + + @Nullable + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(@Nullable String databaseName) { + this.databaseName = databaseName; } - public enum Model { + public enum SqlDialect { TREE, TABLE } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index edf0259e74e..a3f2c4358ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -103,6 +103,24 @@ public class SessionManager implements SessionManagerMBean { String zoneId, TSProtocolVersion tsProtocolVersion, IoTDBConstant.ClientVersion clientVersion) { + return login( + session, + username, + password, + zoneId, + tsProtocolVersion, + clientVersion, + IClientSession.SqlDialect.TREE); + } + + public BasicOpenSessionResp login( + IClientSession session, + String username, + String password, + String zoneId, + TSProtocolVersion tsProtocolVersion, + IoTDBConstant.ClientVersion clientVersion, + IClientSession.SqlDialect sqlDialect) { TSStatus loginStatus; BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp(); @@ -115,8 +133,8 @@ public class SessionManager implements SessionManagerMBean { .setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode()) .setMessage("The version is incompatible, please upgrade to " + IoTDBConstant.VERSION); } else { + session.setSqlDialect(sqlDialect); supplySession(session, username, ZoneId.of(zoneId), clientVersion); - openSessionResp .sessionId(session.getId()) .setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()) @@ -356,7 +374,11 @@ public class SessionManager implements SessionManagerMBean { public SessionInfo getSessionInfo(IClientSession session) { return new SessionInfo( - session.getId(), session.getUsername(), session.getZoneId(), session.getClientVersion()); + session.getId(), + session.getUsername(), + session.getZoneId(), + session.getClientVersion(), + session.getDatabaseName()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index e677796f271..a8ba63cc813 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -81,6 +81,15 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationSt import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.OperatorNotFoundException; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableHandle; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadata; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -173,6 +182,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde; +import org.apache.iotdb.tsfile.read.common.type.Type; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.factory.TimeFilterApi; import org.apache.iotdb.tsfile.utils.Binary; @@ -230,6 +240,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private final ISchemaFetcher schemaFetcher; + private final Metadata metadata; + private final SqlParser relationSqlParser; private final TsBlockSerde serde = new TsBlockSerde(); @@ -265,6 +277,42 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public ClientRPCServiceImpl() { partitionFetcher = ClusterPartitionFetcher.getInstance(); schemaFetcher = ClusterSchemaFetcher.getInstance(); + metadata = + new Metadata() { + @Override + public boolean tableExists(QualifiedObjectName name) { + return false; + } + + @Override + public TableSchema getTableSchema(SessionInfo session, TableHandle tableHandle) { + return null; + } + + @Override + public TableMetadata getTableMetadata(SessionInfo session, TableHandle tableHandle) { + return null; + } + + @Override + public Optional<TableHandle> getTableHandle( + SessionInfo session, QualifiedObjectName name) { + return Optional.empty(); + } + + @Override + public Map<String, ColumnHandle> getColumnHandles( + SessionInfo session, TableHandle tableHandle) { + return null; + } + + @Override + public ResolvedFunction resolveOperator( + OperatorType operatorType, List<? extends Type> argumentTypes) + throws OperatorNotFoundException { + return null; + } + }; relationSqlParser = new SqlParser(); } @@ -284,38 +332,65 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { StatementType statementType = null; Throwable t = null; try { - Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); + // create and cache dataset + ExecutionResult result; + if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) { + Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); + + if (s == null) { + return RpcUtils.getTSExecuteStatementResp( + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); + } + // permission check + TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSExecuteStatementResp(status); + } - if (s == null) { - return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus( - TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); - } - // permission check - TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return RpcUtils.getTSExecuteStatementResp(status); - } + quota = + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + statementType = s.getType(); + if (ENABLE_AUDIT_LOG) { + AuditLogger.log(statement, s); + } - quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); - statementType = s.getType(); - if (ENABLE_AUDIT_LOG) { - AuditLogger.log(statement, s); - } + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); + + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + req.getTimeout()); + } else { + org.apache.iotdb.db.relational.sql.tree.Statement s = + relationSqlParser.createStatement(statement); - queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - // create and cache dataset - ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - req.getTimeout()); + if (s == null) { + return RpcUtils.getTSExecuteStatementResp( + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); + } + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); + + // TODO audit log, quota, StatementType + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + req.getTimeout()); + } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { @@ -331,7 +406,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp.setStatus(result.status); finished = setResult.apply(resp, queryExecution, req.fetchSize); resp.setMoreData(!finished); - quota.addReadResult(resp.getQueryResult()); + if (quota != null) { + quota.addReadResult(resp.getQueryResult()); + } } else { finished = true; resp = RpcUtils.getTSExecuteStatementResp(result.status); @@ -1121,6 +1198,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req); + IClientSession.SqlDialect sqlDialect; + try { + sqlDialect = parseSqlDialect(req); + } catch (IllegalArgumentException e) { + TSStatus tsStatus = RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_SQL_DIALECT, e.getMessage()); + TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); + return resp.setSessionId(-1); + } BasicOpenSessionResp openSessionResp = SESSION_MANAGER.login( SESSION_MANAGER.getCurrSession(), @@ -1128,7 +1213,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { req.password, req.zoneId, req.client_protocol, - clientVersion); + clientVersion, + sqlDialect); TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage()); TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); return resp.setSessionId(openSessionResp.getSessionId()); @@ -1142,6 +1228,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return IoTDBConstant.ClientVersion.V_0_12; } + private IClientSession.SqlDialect parseSqlDialect(TSOpenSessionReq req) { + Map<String, String> configuration = req.configuration; + if (configuration != null && configuration.containsKey("sql_dialect")) { + String sqlDialect = configuration.get("sql_dialect"); + if ("tree".equalsIgnoreCase(sqlDialect)) { + return IClientSession.SqlDialect.TREE; + } else if ("table".equalsIgnoreCase(sqlDialect)) { + return IClientSession.SqlDialect.TABLE; + } else { + throw new IllegalArgumentException("Unknown sql_dialect: " + sqlDialect); + } + } else { + return IClientSession.SqlDialect.TREE; + } + } + @Override public TSStatus closeSession(TSCloseSessionReq req) { return new TSStatus( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/SessionInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/SessionInfo.java index fa401c33645..958d39abafa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/SessionInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/SessionInfo.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.common; import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.plan.relational.security.Identity; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -48,19 +49,22 @@ public class SessionInfo { this.databaseName = null; } - public SessionInfo(long sessionId, String userName, ZoneId zoneId, ClientVersion version) { - this.sessionId = sessionId; - this.userName = userName; - this.zoneId = zoneId; - this.version = version; - this.databaseName = null; + @TestOnly + public SessionInfo( + long sessionId, String userName, ZoneId zoneId, @Nullable String databaseName) { + this(sessionId, userName, zoneId, ClientVersion.V_1_0, databaseName); } public SessionInfo( - long sessionId, String userName, ZoneId zoneId, @Nullable String databaseName) { + long sessionId, + String userName, + ZoneId zoneId, + ClientVersion version, + @Nullable String databaseName) { this.sessionId = sessionId; this.userName = userName; this.zoneId = zoneId; + this.version = version; this.databaseName = databaseName; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 78c5be03828..6ab15a5a3a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -29,12 +29,14 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; @@ -42,9 +44,21 @@ import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskVisitor; +import org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor; import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.RelationalModelPlanner; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.relational.sql.parser.SqlParser; +import org.apache.iotdb.db.relational.sql.tree.CreateDB; +import org.apache.iotdb.db.relational.sql.tree.CreateTable; +import org.apache.iotdb.db.relational.sql.tree.DescribeTable; +import org.apache.iotdb.db.relational.sql.tree.DropDB; +import org.apache.iotdb.db.relational.sql.tree.DropTable; +import org.apache.iotdb.db.relational.sql.tree.ShowDB; +import org.apache.iotdb.db.relational.sql.tree.ShowTables; +import org.apache.iotdb.db.relational.sql.tree.Use; import org.apache.iotdb.db.utils.SetThreadName; import org.slf4j.Logger; @@ -211,6 +225,68 @@ public class Coordinator { return new QueryExecution(treeModelPlanner, queryContext, executor); } + public ExecutionResult executeForTableModel( + org.apache.iotdb.db.relational.sql.tree.Statement statement, + SqlParser sqlParser, + IClientSession clientSession, + long queryId, + SessionInfo session, + String sql, + Metadata metadata, + long timeOut) { + return execution( + queryId, + session, + sql, + ((queryContext, startTime) -> + createQueryExecutionForTableModel( + statement, + sqlParser, + clientSession, + queryContext, + metadata, + timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(), + startTime))); + } + + private IQueryExecution createQueryExecutionForTableModel( + org.apache.iotdb.db.relational.sql.tree.Statement statement, + SqlParser sqlParser, + IClientSession clientSession, + MPPQueryContext queryContext, + Metadata metadata, + long timeOut, + long startTime) { + queryContext.setTimeOut(timeOut); + queryContext.setStartTime(startTime); + if (statement instanceof DropDB + || statement instanceof ShowDB + || statement instanceof CreateDB + || statement instanceof Use + || statement instanceof CreateTable + || statement instanceof DescribeTable + || statement instanceof ShowTables + || statement instanceof DropTable) { + queryContext.setQueryType(QueryType.WRITE); + return new ConfigExecution( + queryContext, + null, + executor, + statement.accept(new TableConfigTaskVisitor(clientSession), queryContext)); + } + RelationalModelPlanner treeModelPlanner = + new RelationalModelPlanner( + statement, + sqlParser, + metadata, + executor, + writeOperationExecutor, + scheduledExecutor, + SYNC_INTERNAL_SERVICE_CLIENT_MANAGER, + ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER); + return new QueryExecution(treeModelPlanner, queryContext, executor); + } + public IQueryExecution getQueryExecution(Long queryId) { return queryExecutionMap.get(queryId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java new file mode 100644 index 00000000000..6061bb63d87 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config; + +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateTableTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask; +import org.apache.iotdb.db.relational.sql.tree.AstVisitor; +import org.apache.iotdb.db.relational.sql.tree.CreateDB; +import org.apache.iotdb.db.relational.sql.tree.CreateTable; +import org.apache.iotdb.db.relational.sql.tree.CurrentDatabase; +import org.apache.iotdb.db.relational.sql.tree.DescribeTable; +import org.apache.iotdb.db.relational.sql.tree.DropDB; +import org.apache.iotdb.db.relational.sql.tree.DropTable; +import org.apache.iotdb.db.relational.sql.tree.Node; +import org.apache.iotdb.db.relational.sql.tree.ShowDB; +import org.apache.iotdb.db.relational.sql.tree.ShowTables; +import org.apache.iotdb.db.relational.sql.tree.Use; + +public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryContext> { + + private final IClientSession clientSession; + + public TableConfigTaskVisitor(IClientSession clientSession) { + this.clientSession = clientSession; + } + + @Override + protected IConfigTask visitNode(Node node, MPPQueryContext context) { + throw new UnsupportedOperationException( + "Unsupported statement type: " + node.getClass().getName()); + } + + @Override + protected IConfigTask visitCreateDB(CreateDB node, MPPQueryContext context) { + return super.visitCreateDB(node, context); + } + + @Override + protected IConfigTask visitUse(Use node, MPPQueryContext context) { + return new UseDBTask(node, clientSession); + } + + @Override + protected IConfigTask visitDropDB(DropDB node, MPPQueryContext context) { + return super.visitDropDB(node, context); + } + + @Override + protected IConfigTask visitShowDB(ShowDB node, MPPQueryContext context) { + return super.visitShowDB(node, context); + } + + @Override + protected IConfigTask visitCreateTable(CreateTable node, MPPQueryContext context) { + String currentDatabase = clientSession.getDatabaseName(); + // TODO using currentDatabase to normalize QualifiedName in CreateTable + + return new CreateTableTask(node); + } + + @Override + protected IConfigTask visitDropTable(DropTable node, MPPQueryContext context) { + return super.visitDropTable(node, context); + } + + @Override + protected IConfigTask visitShowTables(ShowTables node, MPPQueryContext context) { + return super.visitShowTables(node, context); + } + + @Override + protected IConfigTask visitDescribeTable(DescribeTable node, MPPQueryContext context) { + return super.visitDescribeTable(node, context); + } + + @Override + protected IConfigTask visitCurrentDatabase(CurrentDatabase node, MPPQueryContext context) { + return super.visitCurrentDatabase(node, context); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index e844a547093..24fc52de6bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -114,6 +114,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory; +import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.Coordinator; @@ -199,6 +200,11 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaSta import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuotaStatement; +import org.apache.iotdb.db.relational.sql.tree.CreateDB; +import org.apache.iotdb.db.relational.sql.tree.CreateTable; +import org.apache.iotdb.db.relational.sql.tree.DropDB; +import org.apache.iotdb.db.relational.sql.tree.ShowDB; +import org.apache.iotdb.db.relational.sql.tree.Use; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager; import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; @@ -2668,4 +2674,33 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { .setMessage(e.toString())); } } + + @Override + public SettableFuture<ConfigTaskResult> showDatabases(ShowDB showDB) { + return null; + } + + @Override + public SettableFuture<ConfigTaskResult> useDatabase(Use useDB, IClientSession clientSession) { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + // TODO check whether the database exists + clientSession.setDatabaseName(useDB.getDatabase().getValue()); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + return future; + } + + @Override + public SettableFuture<ConfigTaskResult> dropDatabase(DropDB dropDB) { + return null; + } + + @Override + public SettableFuture<ConfigTaskResult> createDatabase(CreateDB createDB) { + return null; + } + + @Override + public SettableFuture<ConfigTaskResult> createTable(CreateTable createTable) { + return null; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index e39602a941d..a13ddaef735 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp; +import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode; @@ -73,6 +74,11 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaSta import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuotaStatement; +import org.apache.iotdb.db.relational.sql.tree.CreateDB; +import org.apache.iotdb.db.relational.sql.tree.CreateTable; +import org.apache.iotdb.db.relational.sql.tree.DropDB; +import org.apache.iotdb.db.relational.sql.tree.ShowDB; +import org.apache.iotdb.db.relational.sql.tree.Use; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -237,4 +243,16 @@ public interface IConfigTaskExecutor { TThrottleQuotaResp getThrottleQuota(); TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req); + + // =============================== table syntax ========================================= + + SettableFuture<ConfigTaskResult> showDatabases(ShowDB showDB); + + SettableFuture<ConfigTaskResult> useDatabase(Use useDB, IClientSession clientSession); + + SettableFuture<ConfigTaskResult> dropDatabase(DropDB dropDB); + + SettableFuture<ConfigTaskResult> createDatabase(CreateDB createDB); + + SettableFuture<ConfigTaskResult> createTable(CreateTable createTable); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/CreateTableTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/CreateTableTask.java new file mode 100644 index 00000000000..a9bca2aef6c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/CreateTableTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.relational.sql.tree.CreateTable; + +import com.google.common.util.concurrent.ListenableFuture; + +public class CreateTableTask implements IConfigTask { + + private final CreateTable createTable; + + public CreateTableTask(CreateTable createTable) { + this.createTable = createTable; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.createTable(createTable); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/UseDBTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/UseDBTask.java new file mode 100644 index 00000000000..7c647e434ec --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/UseDBTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational; + +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.relational.sql.tree.Use; + +import com.google.common.util.concurrent.ListenableFuture; + +public class UseDBTask implements IConfigTask { + + private final Use useDB; + + private final IClientSession clientSession; + + public UseDBTask(Use useDB, IClientSession clientSession) { + this.useDB = useDB; + this.clientSession = clientSession; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.useDatabase(useDB, clientSession); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java index 741f49017d4..28470a3c991 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java @@ -41,7 +41,7 @@ public class Analyzer { private final WarningCollector warningCollector; - Analyzer( + public Analyzer( SessionInfo session, StatementAnalyzerFactory statementAnalyzerFactory, List<Expression> parameters, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java index 5e8486191ca..fc1cf051c98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java @@ -21,22 +21,84 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; +import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.IPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; +import org.apache.iotdb.db.relational.sql.parser.SqlParser; +import org.apache.iotdb.db.relational.sql.tree.Statement; import org.apache.iotdb.rpc.TSStatusCode; +import java.util.Collections; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; public class RelationalModelPlanner implements IPlanner { + private final Statement statement; + + private final SqlParser sqlParser; + private final Metadata metadata; + + // TODO access control + private final AccessControl accessControl = new NopAccessControl(); + + private final WarningCollector warningCollector = WarningCollector.NOOP; + + private final ExecutorService executor; + private final ExecutorService writeOperationExecutor; + private final ScheduledExecutorService scheduledExecutor; + + private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> + syncInternalServiceClientManager; + + private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> + asyncInternalServiceClientManager; + + public RelationalModelPlanner( + Statement statement, + SqlParser sqlParser, + Metadata metadata, + ExecutorService executor, + ExecutorService writeOperationExecutor, + ScheduledExecutorService scheduledExecutor, + IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, + IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> + asyncInternalServiceClientManager) { + this.statement = statement; + this.sqlParser = sqlParser; + this.metadata = metadata; + this.executor = executor; + this.writeOperationExecutor = writeOperationExecutor; + this.scheduledExecutor = scheduledExecutor; + this.syncInternalServiceClientManager = syncInternalServiceClientManager; + this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; + } + @Override public IAnalysis analyze(MPPQueryContext context) { - return null; + StatementAnalyzerFactory statementAnalyzerFactory = + new StatementAnalyzerFactory(metadata, sqlParser, accessControl); + + Analyzer analyzer = + new Analyzer( + context.getSession(), + statementAnalyzerFactory, + Collections.emptyList(), + Collections.emptyMap(), + warningCollector); + return analyzer.analyze(statement); } @Override @@ -69,4 +131,6 @@ public class RelationalModelPlanner implements IPlanner { @Override public void setRedirectInfo( IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) {} + + private static class NopAccessControl implements AccessControl {} }
