This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b5690a7137 Adding DML definition and parse SQL InsertFile (#8557)
b5690a7137 is described below
commit b5690a7137a6b7e3313bc665104ff543b659992f
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Apr 30 03:57:44 2022 -0700
Adding DML definition and parse SQL InsertFile (#8557)
* Adding DML definition and parse sql
* Use MinionClient for execute async task
* Add integration tests
---
.../broker/api/resources/PinotClientRequest.java | 45 +++++++-
.../broker/broker/BrokerAdminApiApplication.java | 4 +-
.../broker/broker/helix/BaseBrokerStarter.java | 12 +-
.../apache/pinot/common/minion/MinionClient.java | 21 ++++
.../common/minion/MinionRequestURLBuilder.java | 4 +
.../apache/pinot/sql/parsers/CalciteSqlParser.java | 51 ++++++++-
.../org/apache/pinot/sql/parsers/PinotSqlType.java | 30 +++++
.../pinot/sql/parsers/SqlNodeAndOptions.java | 41 +++++++
.../sql/parsers/dml/DataManipulationStatement.java | 61 ++++++++++
.../dml/DataManipulationStatementParser.java | 37 ++++++
.../pinot/sql/parsers/dml/InsertIntoFile.java | 102 +++++++++++++++++
.../pinot/sql/parsers/CalciteSqlCompilerTest.java | 1 +
.../pinot/sql/parsers/dml/InsertIntoFileTest.java | 66 +++++++++++
.../pinot/controller/BaseControllerStarter.java | 4 +
.../api/resources/PinotQueryResource.java | 48 +++++++-
.../core/query/executor/sql/SqlQueryExecutor.java | 124 +++++++++++++++++++++
.../pinot/integration/tests/ClusterTest.java | 28 +++++
.../tests/BaseClusterIntegrationTestSet.java | 2 +-
...mentGenerationMinionClusterIntegrationTest.java | 46 ++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
20 files changed, 712 insertions(+), 17 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index aa575c3c2b..8618beca67 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -27,6 +27,8 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.util.HashMap;
+import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -42,11 +44,17 @@ import javax.ws.rs.core.Response;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.PinotSqlType;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.glassfish.jersey.server.ManagedAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +65,9 @@ import org.slf4j.LoggerFactory;
public class PinotClientRequest {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotClientRequest.class);
+ @Inject
+ SqlQueryExecutor _sqlQueryExecutor;
+
@Inject
private BrokerRequestHandler _requestHandler;
@@ -158,8 +169,7 @@ public class PinotClientRequest {
if (debugOptions != null) {
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
}
- BrokerResponse brokerResponse =
- _requestHandler.handleRequest(requestJson,
makeHttpIdentity(requestContext), new RequestStatistics());
+ BrokerResponse brokerResponse = executeSqlQuery(requestJson,
makeHttpIdentity(requestContext), true);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
@@ -187,8 +197,7 @@ public class PinotClientRequest {
String queryOptions = constructSqlQueryOptions();
// the only query options as of now are sql related. do not allow any
custom query options in sql endpoint
ObjectNode sqlRequestJson = ((ObjectNode)
requestJson).put(Request.QUERY_OPTIONS, queryOptions);
- BrokerResponse brokerResponse =
- _requestHandler.handleRequest(sqlRequestJson,
makeHttpIdentity(requestContext), new RequestStatistics());
+ BrokerResponse brokerResponse = executeSqlQuery(sqlRequestJson,
makeHttpIdentity(requestContext), false);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
@@ -197,6 +206,34 @@ public class PinotClientRequest {
}
}
+ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson,
HttpRequesterIdentity httpRequesterIdentity,
+ boolean onlyDql)
+ throws Exception {
+ SqlNodeAndOptions sqlNodeAndOptions;
+ try {
+ sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sqlRequestJson.get(Request.SQL).asText());
+ } catch (Exception e) {
+ return new
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
e));
+ }
+ PinotSqlType sqlType =
CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
+ if (onlyDql && sqlType != PinotSqlType.DQL) {
+ return new
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
+ new UnsupportedOperationException("Unsupported SQL type - " +
sqlType + ", GET API only supports DQL.")));
+ }
+ switch (sqlType) {
+ case DQL:
+ return _requestHandler.handleRequest(sqlRequestJson,
httpRequesterIdentity, new RequestStatistics());
+ case DML:
+ Map<String, String> headers = new HashMap<>();
+ httpRequesterIdentity.getHttpHeaders().entries()
+ .forEach(entry -> headers.put(entry.getKey(), entry.getValue()));
+ return _sqlQueryExecutor.executeDMLStatement(sqlNodeAndOptions,
headers);
+ default:
+ return new
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
+ new UnsupportedOperationException("Unsupported SQL type - " +
sqlType)));
+ }
+ }
+
private String constructSqlQueryOptions() {
return Request.QueryOptionKey.GROUP_BY_MODE + "=" + Request.SQL + ";" +
Request.QueryOptionKey.RESPONSE_FORMAT + "="
+ Request.SQL;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index afd4b807eb..fdb701b9f2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -27,6 +27,7 @@ import
org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.api.ServiceAutoDiscoveryFeature;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -48,7 +49,7 @@ public class BrokerAdminApiApplication extends ResourceConfig
{
private HttpServer _httpServer;
public BrokerAdminApiApplication(BrokerRoutingManager routingManager,
BrokerRequestHandler brokerRequestHandler,
- BrokerMetrics brokerMetrics, PinotConfiguration brokerConf) {
+ BrokerMetrics brokerMetrics, PinotConfiguration brokerConf,
SqlQueryExecutor sqlQueryExecutor) {
packages(RESOURCE_PACKAGE);
property(PINOT_CONFIGURATION, brokerConf);
if
(brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY,
false)) {
@@ -57,6 +58,7 @@ public class BrokerAdminApiApplication extends ResourceConfig
{
register(new AbstractBinder() {
@Override
protected void configure() {
+ bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(routingManager).to(BrokerRoutingManager.class);
bind(brokerRequestHandler).to(BrokerRequestHandler.class);
bind(brokerMetrics).to(BrokerMetrics.class);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 852fc5ae41..8e232fb49e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -103,6 +104,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected BrokerRoutingManager _routingManager;
protected AccessControlFactory _accessControlFactory;
protected BrokerRequestHandler _brokerRequestHandler;
+ protected SqlQueryExecutor _sqlQueryExecutor;
protected BrokerAdminApiApplication _brokerAdminApplication;
protected ClusterChangeMediator _clusterChangeMediator;
// Participant Helix manager handles Helix functionality such as state
transitions and messages
@@ -266,10 +268,16 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
}
}
_brokerRequestHandler.start();
-
+ String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
+ if (controllerUrl != null) {
+ _sqlQueryExecutor = new SqlQueryExecutor(controllerUrl);
+ } else {
+ _sqlQueryExecutor = new SqlQueryExecutor(_spectatorHelixManager);
+ }
LOGGER.info("Starting broker admin application on: {}",
ListenerConfigUtil.toString(_listenerConfigs));
_brokerAdminApplication =
- new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler,
_brokerMetrics, _brokerConf);
+ new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler,
_brokerMetrics, _brokerConf,
+ _sqlQueryExecutor);
_brokerAdminApplication.start(_listenerConfigs);
LOGGER.info("Initializing cluster change mediator");
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
index 90b9902fc1..1909f0c8e1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -27,10 +27,12 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -109,6 +111,25 @@ public class MinionClient {
return responseString;
}
+ public Map<String, String> executeTask(AdhocTaskConfig adhocTaskConfig,
@Nullable Map<String, String> headers)
+ throws IOException {
+ HttpPost httpPost =
createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskExecute());
+ httpPost.setEntity(new StringEntity(adhocTaskConfig.toJsonString()));
+ if (headers != null) {
+ headers.remove("content-length");
+ headers.entrySet().forEach(entry -> httpPost.setHeader(entry.getKey(),
entry.getValue()));
+ }
+ HttpResponse response = HTTP_CLIENT.execute(httpPost);
+ int statusCode = response.getStatusLine().getStatusCode();
+ final String responseString =
IOUtils.toString(response.getEntity().getContent());
+ if (statusCode >= 400) {
+ throw new HttpException(String
+ .format("Unable to get tasks states map. Error code %d, Error
message: %s", statusCode, responseString));
+ }
+ return JsonUtils.stringToObject(responseString, new
TypeReference<Map<String, String>>() {
+ });
+ }
+
private HttpGet createHttpGetRequest(String uri) {
HttpGet httpGet = new HttpGet(uri);
httpGet.setHeader(ACCEPT, APPLICATION_JSON);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
index 35eb2b8d16..379227f387 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
@@ -90,4 +90,8 @@ public class MinionRequestURLBuilder {
public String forTaskTypeDelete(String taskType) {
return StringUtil.join("/", _baseUrl, "tasks", taskType);
}
+
+ public String forTaskExecute() {
+ return StringUtil.join("/", _baseUrl, "tasks/execute");
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index 1a1a1ac46e..5edb31fce6 100644
---
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -60,6 +60,7 @@ import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.utils.Pairs;
+import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
import org.apache.pinot.sql.parsers.parser.SqlParserImpl;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
@@ -107,6 +108,40 @@ public class CalciteSqlParser {
return sql;
}
+ public static SqlNodeAndOptions compileToSqlNodeAndOptions(String sql)
+ throws Exception {
+ // Remove the comments from the query
+ sql = removeComments(sql);
+
+ // Remove the terminating semicolon from the query
+ sql = removeTerminatingSemicolon(sql);
+
+ // Extract OPTION statements from sql as Calcite Parser doesn't parse it.
+ List<String> options = extractOptionsFromSql(sql);
+ if (!options.isEmpty()) {
+ sql = removeOptionsFromSql(sql);
+ }
+
+ try (StringReader inStream = new StringReader(sql)) {
+ SqlParserImpl sqlParser = newSqlParser(inStream);
+ return new SqlNodeAndOptions(sqlParser.parseSqlStmtEof(), options);
+ } catch (Throwable e) {
+ throw new SqlCompilationException("Caught exception while parsing query:
" + sql, e);
+ }
+ }
+
+ public static PinotSqlType extractSqlType(SqlNode sqlNode) {
+ switch (sqlNode.getKind()) {
+ case OTHER_DDL:
+ if (sqlNode instanceof SqlInsertFromFile) {
+ return PinotSqlType.DML;
+ }
+ throw new SqlCompilationException("Unsupported SqlNode type - " +
sqlNode.getKind());
+ default:
+ return PinotSqlType.DQL;
+ }
+ }
+
public static PinotQuery compileToPinotQuery(String sql)
throws SqlCompilationException {
// Remove the comments from the query
@@ -324,10 +359,7 @@ public class CalciteSqlParser {
return sqlParser;
}
- private static void setOptions(PinotQuery pinotQuery, List<String>
optionsStatements) {
- if (optionsStatements.isEmpty()) {
- return;
- }
+ public static Map<String, String> extractOptionsMap(List<String>
optionsStatements) {
Map<String, String> options = new HashMap<>();
for (String optionsStatement : optionsStatements) {
for (String option : optionsStatement.split(",")) {
@@ -338,10 +370,17 @@ public class CalciteSqlParser {
options.put(splits[0].trim(), splits[1].trim());
}
}
- pinotQuery.setQueryOptions(options);
+ return options;
+ }
+
+ private static void setOptions(PinotQuery pinotQuery, List<String>
optionsStatements) {
+ if (optionsStatements.isEmpty()) {
+ return;
+ }
+ pinotQuery.setQueryOptions(extractOptionsMap(optionsStatements));
}
- private static PinotQuery compileSqlNodeToPinotQuery(SqlNode sqlNode) {
+ public static PinotQuery compileSqlNodeToPinotQuery(SqlNode sqlNode) {
PinotQuery pinotQuery = new PinotQuery();
if (sqlNode instanceof SqlExplain) {
// Extract sql node for the query
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/PinotSqlType.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/PinotSqlType.java
new file mode 100644
index 0000000000..ae97409b3d
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/PinotSqlType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers;
+
+public enum PinotSqlType {
+ /* Data Query Language (DQL), e.g. SELECT */
+ DQL,
+ /* Data Control Language(DCL), e.g. GRANT, REVOKE */
+ DCL,
+ /* Data Manipulation Language (DML), e.g. INSERT, UPSERT, UPDATE, DELETE */
+ DML,
+ /* Data Definition Language (DDL), e.g. CREATE, DROP, ALTER, TRUNCATE */
+ DDL
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
new file mode 100644
index 0000000000..9e81da7378
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers;
+
+import java.util.List;
+import org.apache.calcite.sql.SqlNode;
+
+
+public class SqlNodeAndOptions {
+ private final SqlNode _sqlNode;
+ private final List<String> _options;
+
+ public SqlNodeAndOptions(SqlNode sqlNode, List<String> options) {
+ _sqlNode = sqlNode;
+ _options = options;
+ }
+
+ public SqlNode getSqlNode() {
+ return _sqlNode;
+ }
+
+ public List<String> getOptions() {
+ return _options;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatement.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatement.java
new file mode 100644
index 0000000000..25eb99b626
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatement.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+
+
+/**
+ * DML Statement
+ */
+public interface DataManipulationStatement {
+ /**
+ * The method to execute this Statement, e.g. MINION or HTTP.
+ * @return
+ */
+ ExecutionType getExecutionType();
+
+ /**
+ * Generate minion task config for this statement.
+ * @return Adhoc minion task config
+ */
+ AdhocTaskConfig generateAdhocTaskConfig();
+
+ /**
+ * Execute the statement and format response to response row format.
+ * Not used for Minion ExecutionType.
+ * @return Result rows
+ */
+ List<Object[]> execute();
+
+ /**
+ * @return Result schema for response
+ */
+ DataSchema getResultSchema();
+
+ /**
+ * Execution method for this SQL statement.
+ */
+ enum ExecutionType {
+ HTTP,
+ MINION
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatementParser.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatementParser.java
new file mode 100644
index 0000000000..88f02998de
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatementParser.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
+
+
+public class DataManipulationStatementParser {
+ private DataManipulationStatementParser() {
+ }
+
+ public static DataManipulationStatement parse(SqlNodeAndOptions
sqlNodeAndOptions) {
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (sqlNode instanceof SqlInsertFromFile) {
+ return InsertIntoFile.parse(sqlNodeAndOptions);
+ }
+ throw new UnsupportedOperationException("Unsupported DML SqlKind - " +
sqlNode.getKind());
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/InsertIntoFile.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/InsertIntoFile.java
new file mode 100644
index 0000000000..aac4199496
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/InsertIntoFile.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
+
+
+public class InsertIntoFile implements DataManipulationStatement {
+
+ private static final DataSchema INSERT_FROM_FILE_RESPONSE_SCHEMA =
+ new DataSchema(new String[]{"tableName", "taskJobName"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING});
+ private static final String TASK_NAME = "taskName";
+ private static final String TASK_TYPE = "taskType";
+ private static final String DEFAULT_TASK_TYPE =
"SegmentGenerationAndPushTask";
+ private static final String INPUT_DIR_URI = "inputDirURI";
+
+ private final String _table;
+ private final Map<String, String> _queryOptions;
+
+ public InsertIntoFile(String table, Map<String, String> queryOptions) {
+ _table = table;
+ _queryOptions = queryOptions;
+ }
+
+ public String getTable() {
+ return _table;
+ }
+
+ public Map<String, String> getQueryOptions() {
+ return _queryOptions;
+ }
+
+ public static InsertIntoFile parse(SqlNodeAndOptions sqlNodeAndOptions) {
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ assert sqlNode instanceof SqlInsertFromFile;
+ SqlInsertFromFile sqlInsertFromFile = (SqlInsertFromFile) sqlNode;
+ // operandList[0] : database name
+ // operandList[1] : table name
+ // operandList[2] : file list
+ List<SqlNode> operandList = sqlInsertFromFile.getOperandList();
+ // Set table
+ String tableName = operandList.get(0) != null ? StringUtils.joinWith(",",
operandList.get(0), operandList.get(1))
+ : operandList.get(1).toString();
+ // Set Options
+ Map<String, String> optionsMap =
CalciteSqlParser.extractOptionsMap(sqlNodeAndOptions.getOptions());
+ List<String> inputDirList = new ArrayList<>();
+ ((SqlNodeList) operandList.get(2)).getList()
+ .forEach(sqlNode1 -> inputDirList.add(sqlNode1.toString().replace("'",
"")));
+ optionsMap.put(INPUT_DIR_URI, StringUtils.join(inputDirList, ","));
+ return new InsertIntoFile(tableName, optionsMap);
+ }
+
+ @Override
+ public ExecutionType getExecutionType() {
+ return ExecutionType.MINION;
+ }
+
+ @Override
+ public AdhocTaskConfig generateAdhocTaskConfig() {
+ Map<String, String> queryOptions = this.getQueryOptions();
+ String taskName = queryOptions.get(TASK_NAME);
+ String taskType = queryOptions.getOrDefault(TASK_TYPE, DEFAULT_TASK_TYPE);
+ return new AdhocTaskConfig(taskType, this.getTable(), taskName,
queryOptions);
+ }
+
+ @Override
+ public List<Object[]> execute() {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public DataSchema getResultSchema() {
+ return INSERT_FROM_FILE_RESPONSE_SCHEMA;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 90f0d2604b..0bb6035986 100644
---
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -2624,6 +2624,7 @@ public class CalciteSqlCompilerTest {
String customSql = "INSERT INTO db.tbl FROM FILE 'file:///tmp/file1', FILE
'file:///tmp/file2'";
SqlNode sqlNode = testSqlWithCustomSqlParser(customSql);
Assert.assertTrue(sqlNode instanceof SqlInsertFromFile);
+ Assert.assertEquals(CalciteSqlParser.extractSqlType(sqlNode),
PinotSqlType.DML);
}
private static SqlNode testSqlWithCustomSqlParser(String sqlString) {
diff --git
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/dml/InsertIntoFileTest.java
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/dml/InsertIntoFileTest.java
new file mode 100644
index 0000000000..ef688d32b5
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/dml/InsertIntoFileTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class InsertIntoFileTest {
+
+ @Test
+ public void testInsertIntoStatementParser()
+ throws Exception {
+ String insertIntoSql = "INSERT INTO \"baseballStats\"\n"
+ + "FROM FILE 's3://my-bucket/path/to/data/'\n"
+ + "OPTION(taskName=myTask-1)\n"
+ +
"OPTION(input.fs.className=org.apache.pinot.plugin.filesystem.S3PinotFS)\n"
+ + "OPTION(input.fs.prop.accessKey=my-access-key)\n"
+ + "OPTION(input.fs.prop.secretKey=my-secret-key)\n"
+ + "OPTION(input.fs.prop.region=us-west-2)";
+ InsertIntoFile insertIntoFile =
InsertIntoFile.parse(CalciteSqlParser.compileToSqlNodeAndOptions(insertIntoSql));
+ Assert.assertEquals(insertIntoFile.getTable(), "baseballStats");
+ Assert.assertEquals(insertIntoFile.getExecutionType(),
DataManipulationStatement.ExecutionType.MINION);
+ Assert.assertEquals(insertIntoFile.getResultSchema(), new DataSchema(new
String[]{"tableName", "taskJobName"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING}));
+ Assert.assertEquals(insertIntoFile.getQueryOptions().size(), 6);
+ Assert.assertEquals(insertIntoFile.getQueryOptions().get("taskName"),
"myTask-1");
+
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.className"),
+ "org.apache.pinot.plugin.filesystem.S3PinotFS");
+
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.prop.accessKey"),
"my-access-key");
+
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.prop.secretKey"),
"my-secret-key");
+
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.prop.region"),
"us-west-2");
+ Assert.assertEquals(insertIntoFile.getQueryOptions().get("inputDirURI"),
"s3://my-bucket/path/to/data/");
+ AdhocTaskConfig adhocTaskConfig = insertIntoFile.generateAdhocTaskConfig();
+ Assert.assertEquals(adhocTaskConfig.getTaskType(),
"SegmentGenerationAndPushTask");
+ Assert.assertEquals(adhocTaskConfig.getTaskName(), "myTask-1");
+ Assert.assertEquals(adhocTaskConfig.getTableName(), "baseballStats");
+ Assert.assertEquals(adhocTaskConfig.getTaskConfigs().size(), 6);
+ Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("taskName"),
"myTask-1");
+
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.className"),
+ "org.apache.pinot.plugin.filesystem.S3PinotFS");
+
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.prop.accessKey"),
"my-access-key");
+
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.prop.secretKey"),
"my-secret-key");
+
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.prop.region"),
"us-west-2");
+ Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("inputDirURI"),
"s3://my-bucket/path/to/data/");
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 8425db6475..6fe183a128 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -88,6 +88,7 @@ import
org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -137,6 +138,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected HelixManager _helixParticipantManager;
protected PinotMetricsRegistry _metricsRegistry;
protected ControllerMetrics _controllerMetrics;
+ protected SqlQueryExecutor _sqlQueryExecutor;
// Can only be constructed after resource manager getting started
protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
@@ -414,6 +416,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Realtime tables with High Level consumers will NOT be
supported");
_realtimeSegmentsManager = null;
}
+ _sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks =
setupControllerPeriodicTasks();
@@ -461,6 +464,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
bind(_leadControllerManager).to(LeadControllerManager.class);
bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
+ bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
}
});
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 1c7895c0af..8dac12afbf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -42,6 +43,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
+import org.apache.calcite.sql.SqlNode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
@@ -51,11 +53,15 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControl;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.PinotSqlType;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +73,9 @@ public class PinotQueryResource {
private static final CalciteSqlCompiler SQL_QUERY_COMPILER = new
CalciteSqlCompiler();
private static final Random RANDOM = new Random();
+ @Inject
+ SqlQueryExecutor _sqlQueryExecutor;
+
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
@@ -135,20 +144,46 @@ public class PinotQueryResource {
queryOptions = requestJson.get("queryOptions").asText();
}
LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
- return getQueryResponse(sqlQuery, traceEnabled, queryOptions,
httpHeaders, CommonConstants.Broker.Request.SQL);
+ return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled,
queryOptions);
} catch (Exception e) {
LOGGER.error("Caught exception while processing post request", e);
return QueryException.getException(QueryException.INTERNAL_ERROR,
e).toString();
}
}
+ private String executeSqlQuery(@Context HttpHeaders httpHeaders, String
sqlQuery,
+ String traceEnabled, String queryOptions)
+ throws Exception {
+ SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
+ PinotSqlType sqlType =
CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
+ switch (sqlType) {
+ case DQL:
+ return getQueryResponse(sqlQuery, sqlNodeAndOptions.getSqlNode(),
traceEnabled, queryOptions, httpHeaders,
+ CommonConstants.Broker.Request.SQL);
+ case DML:
+ Map<String, String> headers =
+ httpHeaders.getRequestHeaders().entrySet().stream().filter(entry
-> !entry.getValue().isEmpty())
+ .map(entry -> Pair.of(entry.getKey(), entry.getValue().get(0)))
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ return _sqlQueryExecutor.executeDMLStatement(sqlNodeAndOptions,
headers).toJsonString();
+ default:
+ throw new UnsupportedOperationException("Unsupported SQL type - " +
sqlType);
+ }
+ }
+
@GET
@Path("sql")
public String handleGetSql(@QueryParam("sql") String sqlQuery,
@QueryParam("trace") String traceEnabled,
@QueryParam("queryOptions") String queryOptions, @Context HttpHeaders
httpHeaders) {
try {
LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
- return getQueryResponse(sqlQuery, traceEnabled, queryOptions,
httpHeaders, CommonConstants.Broker.Request.SQL);
+ SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
+ PinotSqlType sqlType =
CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
+ if (sqlType == PinotSqlType.DQL) {
+ return getQueryResponse(sqlQuery, sqlNodeAndOptions.getSqlNode(),
traceEnabled, queryOptions, httpHeaders,
+ CommonConstants.Broker.Request.SQL);
+ }
+ throw new UnsupportedOperationException("Unsupported SQL type - " +
sqlType);
} catch (Exception e) {
LOGGER.error("Caught exception while processing get request", e);
return QueryException.getException(QueryException.INTERNAL_ERROR,
e).toString();
@@ -157,13 +192,20 @@ public class PinotQueryResource {
public String getQueryResponse(String query, String traceEnabled, String
queryOptions, HttpHeaders httpHeaders,
String querySyntax) {
+ return getQueryResponse(query, null, traceEnabled, queryOptions,
httpHeaders, querySyntax);
+ }
+
+ public String getQueryResponse(String query, @Nullable SqlNode sqlNode,
String traceEnabled, String queryOptions,
+ HttpHeaders httpHeaders, String querySyntax) {
// Get resource table name.
String tableName;
try {
String inputTableName;
switch (querySyntax) {
case CommonConstants.Broker.Request.SQL:
- inputTableName =
RequestUtils.getTableName(SQL_QUERY_COMPILER.compileToBrokerRequest(query).getPinotQuery());
+ inputTableName =
+ (sqlNode != null) ?
RequestUtils.getTableName(CalciteSqlParser.compileSqlNodeToPinotQuery(sqlNode))
+ :
SQL_QUERY_COMPILER.compileToBrokerRequest(query).getQuerySource().getTableName();
break;
case CommonConstants.Broker.Request.PQL:
inputTableName =
PQL_QUERY_COMPILER.compileToBrokerRequest(query).getQuerySource().getTableName();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java
new file mode 100644
index 0000000000..459b026245
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor.sql;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.minion.MinionClient;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.helix.LeadControllerUtils;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.dml.DataManipulationStatement;
+import org.apache.pinot.sql.parsers.dml.DataManipulationStatementParser;
+
+
+/**
+ * SqlQueryExecutor executes all SQL queries including DQL, DML, DCL, DDL.
+ *
+ */
+public class SqlQueryExecutor {
+ private final String _controllerUrl;
+ private final HelixManager _helixManager;
+
+ /**
+ * Fetch the lead controller from helix, HA is not guaranteed.
+ * @param helixManager is used to query leader controller from helix.
+ */
+ public SqlQueryExecutor(HelixManager helixManager) {
+ _helixManager = helixManager;
+ _controllerUrl = null;
+ }
+
+ /**
+ * Recommended to provide the controller vip or service name for access.
+ * @param controllerUrl controller service name for sending minion task
requests
+ */
+ public SqlQueryExecutor(String controllerUrl) {
+ _controllerUrl = controllerUrl;
+ _helixManager = null;
+ }
+
+ private static String getControllerBaseUrl(HelixManager helixManager) {
+ String instanceHostPort =
LeadControllerUtils.getHelixClusterLeader(helixManager);
+ if (instanceHostPort == null) {
+ throw new RuntimeException("Unable to locate the leader pinot
controller, please retry later...");
+ }
+ int index = instanceHostPort.lastIndexOf('_');
+ if (index < 0) {
+ throw new RuntimeException("Unable to parse pinot controller instance
name for " + instanceHostPort);
+ }
+ String leaderHost = instanceHostPort.substring(0, index);
+ String leaderPort = instanceHostPort.substring(index + 1);
+ return "http://" + leaderHost + ":" + leaderPort;
+ }
+
+ /**
+ * Execute DML Statement
+ *
+ * @param sqlNodeAndOptions Parsed DML object
+ * @param headers extra headers map for minion task submission
+ * @return BrokerResponse is the DML executed response
+ */
+ public BrokerResponse executeDMLStatement(SqlNodeAndOptions
sqlNodeAndOptions,
+ @Nullable Map<String, String> headers) {
+ DataManipulationStatement statement =
DataManipulationStatementParser.parse(sqlNodeAndOptions);
+ BrokerResponseNative result = new BrokerResponseNative();
+ switch (statement.getExecutionType()) {
+ case MINION:
+ AdhocTaskConfig taskConf = statement.generateAdhocTaskConfig();
+ try {
+ Map<String, String> tableToTaskIdMap =
getMinionClient().executeTask(taskConf, headers);
+ List<Object[]> rows = new ArrayList<>();
+ tableToTaskIdMap.forEach((key, value) -> rows.add(new Object[]{key,
value}));
+ result.setResultTable(new ResultTable(statement.getResultSchema(),
rows));
+ } catch (IOException e) {
+
result.setExceptions(ImmutableList.of(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e)));
+ }
+ break;
+ case HTTP:
+ try {
+ result.setResultTable(new ResultTable(statement.getResultSchema(),
statement.execute()));
+ } catch (Exception e) {
+
result.setExceptions(ImmutableList.of(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e)));
+ }
+ break;
+ default:
+
result.setExceptions(ImmutableList.of(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
+ new UnsupportedOperationException("Unsupported statement - " +
statement))));
+ break;
+ }
+ return result;
+ }
+
+ private MinionClient getMinionClient() {
+ if (_helixManager != null) {
+ return new MinionClient(getControllerBaseUrl(_helixManager));
+ }
+ return new MinionClient(_controllerUrl);
+ }
+}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d3cebf1cd4..4d3933d351 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -472,4 +472,32 @@ public abstract class ClusterTest extends ControllerTest {
return JsonUtils.stringToJsonNode(sendPostRequest(brokerBaseApiUrl +
"/query/sql", payload.toString(), headers));
}
+
+ /**
+ * Queries the controller's sql query endpoint (/query/sql)
+ */
+ protected JsonNode postQueryToController(String query)
+ throws Exception {
+ return postQueryToController(query, _controllerBaseApiUrl);
+ }
+
+ /**
+ * Queries the controller's sql query endpoint (/sql)
+ */
+ public static JsonNode postQueryToController(String query, String
controllerBaseApiUrl)
+ throws Exception {
+ return postQueryToController(query, controllerBaseApiUrl, null);
+ }
+
+ /**
+ * Queries the controller's sql query endpoint (/sql)
+ */
+ public static JsonNode postQueryToController(String query, String
controllerBaseApiUrl, Map<String, String> headers)
+ throws Exception {
+ ObjectNode payload = JsonUtils.newObjectNode();
+ payload.put("sql", query);
+ payload.put("queryOptions", "groupByMode=sql;responseFormat=sql");
+ return JsonUtils.stringToJsonNode(
+ sendPostRequest(controllerBaseApiUrl + "/sql",
JsonUtils.objectToString(payload), headers));
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 011cd15d2c..074000b737 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -306,7 +306,7 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
// matching query
"SELECT count(*) FROM mytable",
// query that does not match any row
- "SELECT count(*) FROM mytable where
non_existing_column='non_existing_value",
+ "SELECT count(*) FROM mytable where
non_existing_column='non_existing_value'",
// query a non existing table
"SELECT count(*) FROM mytable_foo"
};
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
index 374bb2d617..7880d1b846 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -106,6 +107,51 @@ public class SegmentGenerationMinionClusterIntegrationTest
extends BaseClusterIn
assertEquals(result.get("numSegmentsQueried").asInt(), 7);
}
+ @Test
+ public void testInsertIntoFromFileQueryToBroker()
+ throws Exception {
+ testInsertIntoFromFile("testInsertIntoFromFileQueryToBroker",
"testInsertIntoFromFileQueryToBrokerTask", false);
+ }
+
+ @Test
+ public void testInsertIntoFromFileQueryToController()
+ throws Exception {
+ testInsertIntoFromFile("testInsertIntoFromFileQueryToController",
"testInsertIntoFromFileQueryToControllerTask",
+ true);
+ }
+
+ private void testInsertIntoFromFile(String tableName, String taskName,
boolean queryController)
+ throws Exception {
+ addSchemaAndTableConfig(tableName);
+
+ File inputDir = new File(_tempDir, tableName);
+ int rowCnt = prepInputFiles(inputDir, 7, 10);
+ assertEquals(rowCnt, 70);
+
+ String insertFileStatement =
+ String.format("INSERT INTO %s FROM FILE '%s' OPTION(taskName=%s)",
tableName, inputDir.getAbsolutePath(),
+ taskName);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ if (getTotalDocs(tableName) < rowCnt) {
+ JsonNode response = queryController ?
postQueryToController(insertFileStatement, _controllerBaseApiUrl)
+ : postQuery(insertFileStatement, _brokerBaseApiUrl);
+
Assert.assertEquals(response.get("resultTable").get("rows").get(0).get(0).asText(),
+ tableName + "_OFFLINE");
+
Assert.assertEquals(response.get("resultTable").get("rows").get(0).get(1).asText(),
+ "Task_SegmentGenerationAndPushTask_" + taskName);
+ }
+ return getTotalDocs(tableName) == rowCnt;
+ } catch (Exception e) {
+ LOGGER.error("Failed to get expected totalDocs: " + rowCnt, e);
+ return false;
+ }
+ }, 5000L, 600_000L, "Failed to load " + rowCnt + " documents", true);
+ JsonNode result = postQuery("SELECT COUNT(*) FROM " + tableName,
_brokerBaseApiUrl);
+ // One segment per file.
+ assertEquals(result.get("numSegmentsQueried").asInt(), 7);
+ }
+
private void addSchemaAndTableConfig(String tableName)
throws Exception {
addSchema(new
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("id",
FieldSpec.DataType.INT)
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index c908ecf314..5f6727ad2b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -231,6 +231,8 @@ public class CommonConstants {
public static final String DISABLE_GROOVY =
"pinot.broker.disable.query.groovy";
+ public static final String CONTROLLER_URL = "pinot.broker.controller.url";
+
public static class Request {
public static final String PQL = "pql";
public static final String SQL = "sql";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]