This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b5690a7137 Adding DML definition and parse SQL InsertFile (#8557)
b5690a7137 is described below

commit b5690a7137a6b7e3313bc665104ff543b659992f
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Apr 30 03:57:44 2022 -0700

    Adding DML definition and parse SQL InsertFile (#8557)
    
    * Adding DML definition and parse sql
    
    * Use MinionClient for execute async task
    
    * Add integration tests
---
 .../broker/api/resources/PinotClientRequest.java   |  45 +++++++-
 .../broker/broker/BrokerAdminApiApplication.java   |   4 +-
 .../broker/broker/helix/BaseBrokerStarter.java     |  12 +-
 .../apache/pinot/common/minion/MinionClient.java   |  21 ++++
 .../common/minion/MinionRequestURLBuilder.java     |   4 +
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  51 ++++++++-
 .../org/apache/pinot/sql/parsers/PinotSqlType.java |  30 +++++
 .../pinot/sql/parsers/SqlNodeAndOptions.java       |  41 +++++++
 .../sql/parsers/dml/DataManipulationStatement.java |  61 ++++++++++
 .../dml/DataManipulationStatementParser.java       |  37 ++++++
 .../pinot/sql/parsers/dml/InsertIntoFile.java      | 102 +++++++++++++++++
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  |   1 +
 .../pinot/sql/parsers/dml/InsertIntoFileTest.java  |  66 +++++++++++
 .../pinot/controller/BaseControllerStarter.java    |   4 +
 .../api/resources/PinotQueryResource.java          |  48 +++++++-
 .../core/query/executor/sql/SqlQueryExecutor.java  | 124 +++++++++++++++++++++
 .../pinot/integration/tests/ClusterTest.java       |  28 +++++
 .../tests/BaseClusterIntegrationTestSet.java       |   2 +-
 ...mentGenerationMinionClusterIntegrationTest.java |  46 ++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 20 files changed, 712 insertions(+), 17 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index aa575c3c2b..8618beca67 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -27,6 +27,8 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.HashMap;
+import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -42,11 +44,17 @@ import javax.ws.rs.core.Response;
 import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.PinotSqlType;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.glassfish.jersey.server.ManagedAsync;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +65,9 @@ import org.slf4j.LoggerFactory;
 public class PinotClientRequest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotClientRequest.class);
 
+  @Inject
+  SqlQueryExecutor _sqlQueryExecutor;
+
   @Inject
   private BrokerRequestHandler _requestHandler;
 
@@ -158,8 +169,7 @@ public class PinotClientRequest {
       if (debugOptions != null) {
         requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
       }
-      BrokerResponse brokerResponse =
-          _requestHandler.handleRequest(requestJson, 
makeHttpIdentity(requestContext), new RequestStatistics());
+      BrokerResponse brokerResponse = executeSqlQuery(requestJson, 
makeHttpIdentity(requestContext), true);
       asyncResponse.resume(brokerResponse.toJsonString());
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
@@ -187,8 +197,7 @@ public class PinotClientRequest {
       String queryOptions = constructSqlQueryOptions();
       // the only query options as of now are sql related. do not allow any 
custom query options in sql endpoint
       ObjectNode sqlRequestJson = ((ObjectNode) 
requestJson).put(Request.QUERY_OPTIONS, queryOptions);
-      BrokerResponse brokerResponse =
-          _requestHandler.handleRequest(sqlRequestJson, 
makeHttpIdentity(requestContext), new RequestStatistics());
+      BrokerResponse brokerResponse = executeSqlQuery(sqlRequestJson, 
makeHttpIdentity(requestContext), false);
       asyncResponse.resume(brokerResponse.toJsonString());
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing POST request", e);
@@ -197,6 +206,34 @@ public class PinotClientRequest {
     }
   }
 
+  private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, 
HttpRequesterIdentity httpRequesterIdentity,
+      boolean onlyDql)
+      throws Exception {
+    SqlNodeAndOptions sqlNodeAndOptions;
+    try {
+      sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlRequestJson.get(Request.SQL).asText());
+    } catch (Exception e) {
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
 e));
+    }
+    PinotSqlType sqlType = 
CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
+    if (onlyDql && sqlType != PinotSqlType.DQL) {
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
+          new UnsupportedOperationException("Unsupported SQL type - " + 
sqlType + ", GET API only supports DQL.")));
+    }
+    switch (sqlType) {
+      case DQL:
+        return _requestHandler.handleRequest(sqlRequestJson, 
httpRequesterIdentity, new RequestStatistics());
+      case DML:
+        Map<String, String> headers = new HashMap<>();
+        httpRequesterIdentity.getHttpHeaders().entries()
+            .forEach(entry -> headers.put(entry.getKey(), entry.getValue()));
+        return _sqlQueryExecutor.executeDMLStatement(sqlNodeAndOptions, 
headers);
+      default:
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
+            new UnsupportedOperationException("Unsupported SQL type - " + 
sqlType)));
+    }
+  }
+
   private String constructSqlQueryOptions() {
     return Request.QueryOptionKey.GROUP_BY_MODE + "=" + Request.SQL + ";" + 
Request.QueryOptionKey.RESPONSE_FORMAT + "="
         + Request.SQL;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index afd4b807eb..fdb701b9f2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -27,6 +27,7 @@ import 
org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.core.api.ServiceAutoDiscoveryFeature;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -48,7 +49,7 @@ public class BrokerAdminApiApplication extends ResourceConfig 
{
   private HttpServer _httpServer;
 
   public BrokerAdminApiApplication(BrokerRoutingManager routingManager, 
BrokerRequestHandler brokerRequestHandler,
-      BrokerMetrics brokerMetrics, PinotConfiguration brokerConf) {
+      BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, 
SqlQueryExecutor sqlQueryExecutor) {
     packages(RESOURCE_PACKAGE);
     property(PINOT_CONFIGURATION, brokerConf);
     if 
(brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY, 
false)) {
@@ -57,6 +58,7 @@ public class BrokerAdminApiApplication extends ResourceConfig 
{
     register(new AbstractBinder() {
       @Override
       protected void configure() {
+        bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
         bind(routingManager).to(BrokerRoutingManager.class);
         bind(brokerRequestHandler).to(BrokerRequestHandler.class);
         bind(brokerMetrics).to(BrokerMetrics.class);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 852fc5ae41..8e232fb49e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -103,6 +104,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected BrokerRoutingManager _routingManager;
   protected AccessControlFactory _accessControlFactory;
   protected BrokerRequestHandler _brokerRequestHandler;
+  protected SqlQueryExecutor _sqlQueryExecutor;
   protected BrokerAdminApiApplication _brokerAdminApplication;
   protected ClusterChangeMediator _clusterChangeMediator;
   // Participant Helix manager handles Helix functionality such as state 
transitions and messages
@@ -266,10 +268,16 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       }
     }
     _brokerRequestHandler.start();
-
+    String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
+    if (controllerUrl != null) {
+      _sqlQueryExecutor = new SqlQueryExecutor(controllerUrl);
+    } else {
+      _sqlQueryExecutor = new SqlQueryExecutor(_spectatorHelixManager);
+    }
     LOGGER.info("Starting broker admin application on: {}", 
ListenerConfigUtil.toString(_listenerConfigs));
     _brokerAdminApplication =
-        new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, 
_brokerMetrics, _brokerConf);
+        new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, 
_brokerMetrics, _brokerConf,
+            _sqlQueryExecutor);
     _brokerAdminApplication.start(_listenerConfigs);
 
     LOGGER.info("Initializing cluster change mediator");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
index 90b9902fc1..1909f0c8e1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -27,10 +27,12 @@ import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -109,6 +111,25 @@ public class MinionClient {
     return responseString;
   }
 
+  public Map<String, String> executeTask(AdhocTaskConfig adhocTaskConfig, 
@Nullable Map<String, String> headers)
+      throws IOException {
+    HttpPost httpPost = 
createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskExecute());
+    httpPost.setEntity(new StringEntity(adhocTaskConfig.toJsonString()));
+    if (headers != null) {
+      headers.remove("content-length");
+      headers.entrySet().forEach(entry -> httpPost.setHeader(entry.getKey(), 
entry.getValue()));
+    }
+    HttpResponse response = HTTP_CLIENT.execute(httpPost);
+    int statusCode = response.getStatusLine().getStatusCode();
+    final String responseString = 
IOUtils.toString(response.getEntity().getContent());
+    if (statusCode >= 400) {
+      throw new HttpException(String
+          .format("Unable to get tasks states map. Error code %d, Error 
message: %s", statusCode, responseString));
+    }
+    return JsonUtils.stringToObject(responseString, new 
TypeReference<Map<String, String>>() {
+    });
+  }
+
   private HttpGet createHttpGetRequest(String uri) {
     HttpGet httpGet = new HttpGet(uri);
     httpGet.setHeader(ACCEPT, APPLICATION_JSON);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
index 35eb2b8d16..379227f387 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
@@ -90,4 +90,8 @@ public class MinionRequestURLBuilder {
   public String forTaskTypeDelete(String taskType) {
     return StringUtil.join("/", _baseUrl, "tasks", taskType);
   }
+
+  public String forTaskExecute() {
+    return StringUtil.join("/", _baseUrl, "tasks/execute");
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index 1a1a1ac46e..5edb31fce6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -60,6 +60,7 @@ import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.utils.Pairs;
+import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
 import org.apache.pinot.sql.parsers.parser.SqlParserImpl;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
@@ -107,6 +108,40 @@ public class CalciteSqlParser {
     return sql;
   }
 
+  public static SqlNodeAndOptions compileToSqlNodeAndOptions(String sql)
+      throws Exception {
+    // Remove the comments from the query
+    sql = removeComments(sql);
+
+    // Remove the terminating semicolon from the query
+    sql = removeTerminatingSemicolon(sql);
+
+    // Extract OPTION statements from sql as Calcite Parser doesn't parse it.
+    List<String> options = extractOptionsFromSql(sql);
+    if (!options.isEmpty()) {
+      sql = removeOptionsFromSql(sql);
+    }
+
+    try (StringReader inStream = new StringReader(sql)) {
+      SqlParserImpl sqlParser = newSqlParser(inStream);
+      return new SqlNodeAndOptions(sqlParser.parseSqlStmtEof(), options);
+    } catch (Throwable e) {
+      throw new SqlCompilationException("Caught exception while parsing query: 
" + sql, e);
+    }
+  }
+
+  public static PinotSqlType extractSqlType(SqlNode sqlNode) {
+    switch (sqlNode.getKind()) {
+      case OTHER_DDL:
+        if (sqlNode instanceof SqlInsertFromFile) {
+          return PinotSqlType.DML;
+        }
+        throw new SqlCompilationException("Unsupported SqlNode type - " + 
sqlNode.getKind());
+      default:
+        return PinotSqlType.DQL;
+    }
+  }
+
   public static PinotQuery compileToPinotQuery(String sql)
       throws SqlCompilationException {
     // Remove the comments from the query
@@ -324,10 +359,7 @@ public class CalciteSqlParser {
     return sqlParser;
   }
 
-  private static void setOptions(PinotQuery pinotQuery, List<String> 
optionsStatements) {
-    if (optionsStatements.isEmpty()) {
-      return;
-    }
+  public static Map<String, String> extractOptionsMap(List<String> 
optionsStatements) {
     Map<String, String> options = new HashMap<>();
     for (String optionsStatement : optionsStatements) {
       for (String option : optionsStatement.split(",")) {
@@ -338,10 +370,17 @@ public class CalciteSqlParser {
         options.put(splits[0].trim(), splits[1].trim());
       }
     }
-    pinotQuery.setQueryOptions(options);
+    return options;
+  }
+
+  private static void setOptions(PinotQuery pinotQuery, List<String> 
optionsStatements) {
+    if (optionsStatements.isEmpty()) {
+      return;
+    }
+    pinotQuery.setQueryOptions(extractOptionsMap(optionsStatements));
   }
 
-  private static PinotQuery compileSqlNodeToPinotQuery(SqlNode sqlNode) {
+  public static PinotQuery compileSqlNodeToPinotQuery(SqlNode sqlNode) {
     PinotQuery pinotQuery = new PinotQuery();
     if (sqlNode instanceof SqlExplain) {
       // Extract sql node for the query
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/PinotSqlType.java 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/PinotSqlType.java
new file mode 100644
index 0000000000..ae97409b3d
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/PinotSqlType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers;
+
+public enum PinotSqlType {
+  /* Data Query Language (DQL), e.g. SELECT */
+  DQL,
+  /* Data Control Language(DCL), e.g. GRANT, REVOKE */
+  DCL,
+  /* Data Manipulation Language (DML), e.g. INSERT, UPSERT, UPDATE, DELETE */
+  DML,
+  /* Data Definition Language (DDL), e.g. CREATE, DROP, ALTER, TRUNCATE */
+  DDL
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
new file mode 100644
index 0000000000..9e81da7378
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers;
+
+import java.util.List;
+import org.apache.calcite.sql.SqlNode;
+
+
+public class SqlNodeAndOptions {
+  private final SqlNode _sqlNode;
+  private final List<String> _options;
+
+  public SqlNodeAndOptions(SqlNode sqlNode, List<String> options) {
+    _sqlNode = sqlNode;
+    _options = options;
+  }
+
+  public SqlNode getSqlNode() {
+    return _sqlNode;
+  }
+
+  public List<String> getOptions() {
+    return _options;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatement.java
 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatement.java
new file mode 100644
index 0000000000..25eb99b626
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatement.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+
+
+/**
+ * DML Statement
+ */
+public interface DataManipulationStatement {
+  /**
+   * The method to execute this Statement, e.g. MINION or HTTP.
+   * @return
+   */
+  ExecutionType getExecutionType();
+
+  /**
+   * Generate minion task config for this statement.
+   * @return Adhoc minion task config
+   */
+  AdhocTaskConfig generateAdhocTaskConfig();
+
+  /**
+   * Execute the statement and format response to response row format.
+   * Not used for Minion ExecutionType.
+   * @return Result rows
+   */
+  List<Object[]> execute();
+
+  /**
+   * @return Result schema for response
+   */
+  DataSchema getResultSchema();
+
+  /**
+   * Execution method for this SQL statement.
+   */
+  enum ExecutionType {
+    HTTP,
+    MINION
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatementParser.java
 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatementParser.java
new file mode 100644
index 0000000000..88f02998de
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/DataManipulationStatementParser.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
+
+
+public class DataManipulationStatementParser {
+  private DataManipulationStatementParser() {
+  }
+
+  public static DataManipulationStatement parse(SqlNodeAndOptions 
sqlNodeAndOptions) {
+    SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+    if (sqlNode instanceof SqlInsertFromFile) {
+      return InsertIntoFile.parse(sqlNodeAndOptions);
+    }
+    throw new UnsupportedOperationException("Unsupported DML SqlKind - " + 
sqlNode.getKind());
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/InsertIntoFile.java
 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/InsertIntoFile.java
new file mode 100644
index 0000000000..aac4199496
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/dml/InsertIntoFile.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
+
+
+public class InsertIntoFile implements DataManipulationStatement {
+
+  private static final DataSchema INSERT_FROM_FILE_RESPONSE_SCHEMA =
+      new DataSchema(new String[]{"tableName", "taskJobName"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING});
+  private static final String TASK_NAME = "taskName";
+  private static final String TASK_TYPE = "taskType";
+  private static final String DEFAULT_TASK_TYPE = 
"SegmentGenerationAndPushTask";
+  private static final String INPUT_DIR_URI = "inputDirURI";
+
+  private final String _table;
+  private final Map<String, String> _queryOptions;
+
+  public InsertIntoFile(String table, Map<String, String> queryOptions) {
+    _table = table;
+    _queryOptions = queryOptions;
+  }
+
+  public String getTable() {
+    return _table;
+  }
+
+  public Map<String, String> getQueryOptions() {
+    return _queryOptions;
+  }
+
+  public static InsertIntoFile parse(SqlNodeAndOptions sqlNodeAndOptions) {
+    SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+    assert sqlNode instanceof SqlInsertFromFile;
+    SqlInsertFromFile sqlInsertFromFile = (SqlInsertFromFile) sqlNode;
+    // operandList[0] : database name
+    // operandList[1] : table name
+    // operandList[2] : file list
+    List<SqlNode> operandList = sqlInsertFromFile.getOperandList();
+    // Set table
+    String tableName = operandList.get(0) != null ? StringUtils.joinWith(",", 
operandList.get(0), operandList.get(1))
+        : operandList.get(1).toString();
+    // Set Options
+    Map<String, String> optionsMap = 
CalciteSqlParser.extractOptionsMap(sqlNodeAndOptions.getOptions());
+    List<String> inputDirList = new ArrayList<>();
+    ((SqlNodeList) operandList.get(2)).getList()
+        .forEach(sqlNode1 -> inputDirList.add(sqlNode1.toString().replace("'", 
"")));
+    optionsMap.put(INPUT_DIR_URI, StringUtils.join(inputDirList, ","));
+    return new InsertIntoFile(tableName, optionsMap);
+  }
+
+  @Override
+  public ExecutionType getExecutionType() {
+    return ExecutionType.MINION;
+  }
+
+  @Override
+  public AdhocTaskConfig generateAdhocTaskConfig() {
+    Map<String, String> queryOptions = this.getQueryOptions();
+    String taskName = queryOptions.get(TASK_NAME);
+    String taskType = queryOptions.getOrDefault(TASK_TYPE, DEFAULT_TASK_TYPE);
+    return new AdhocTaskConfig(taskType, this.getTable(), taskName, 
queryOptions);
+  }
+
+  @Override
+  public List<Object[]> execute() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public DataSchema getResultSchema() {
+    return INSERT_FROM_FILE_RESPONSE_SCHEMA;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 90f0d2604b..0bb6035986 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -2624,6 +2624,7 @@ public class CalciteSqlCompilerTest {
     String customSql = "INSERT INTO db.tbl FROM FILE 'file:///tmp/file1', FILE 
'file:///tmp/file2'";
     SqlNode sqlNode = testSqlWithCustomSqlParser(customSql);
     Assert.assertTrue(sqlNode instanceof SqlInsertFromFile);
+    Assert.assertEquals(CalciteSqlParser.extractSqlType(sqlNode), 
PinotSqlType.DML);
   }
 
   private static SqlNode testSqlWithCustomSqlParser(String sqlString) {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/dml/InsertIntoFileTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/dml/InsertIntoFileTest.java
new file mode 100644
index 0000000000..ef688d32b5
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/dml/InsertIntoFileTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.dml;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class InsertIntoFileTest {
+
+  @Test
+  public void testInsertIntoStatementParser()
+      throws Exception {
+    String insertIntoSql = "INSERT INTO \"baseballStats\"\n"
+        + "FROM FILE 's3://my-bucket/path/to/data/'\n"
+        + "OPTION(taskName=myTask-1)\n"
+        + 
"OPTION(input.fs.className=org.apache.pinot.plugin.filesystem.S3PinotFS)\n"
+        + "OPTION(input.fs.prop.accessKey=my-access-key)\n"
+        + "OPTION(input.fs.prop.secretKey=my-secret-key)\n"
+        + "OPTION(input.fs.prop.region=us-west-2)";
+    InsertIntoFile insertIntoFile = 
InsertIntoFile.parse(CalciteSqlParser.compileToSqlNodeAndOptions(insertIntoSql));
+    Assert.assertEquals(insertIntoFile.getTable(), "baseballStats");
+    Assert.assertEquals(insertIntoFile.getExecutionType(), 
DataManipulationStatement.ExecutionType.MINION);
+    Assert.assertEquals(insertIntoFile.getResultSchema(), new DataSchema(new 
String[]{"tableName", "taskJobName"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING}));
+    Assert.assertEquals(insertIntoFile.getQueryOptions().size(), 6);
+    Assert.assertEquals(insertIntoFile.getQueryOptions().get("taskName"), 
"myTask-1");
+    
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.className"),
+        "org.apache.pinot.plugin.filesystem.S3PinotFS");
+    
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.prop.accessKey"),
 "my-access-key");
+    
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.prop.secretKey"),
 "my-secret-key");
+    
Assert.assertEquals(insertIntoFile.getQueryOptions().get("input.fs.prop.region"),
 "us-west-2");
+    Assert.assertEquals(insertIntoFile.getQueryOptions().get("inputDirURI"), 
"s3://my-bucket/path/to/data/");
+    AdhocTaskConfig adhocTaskConfig = insertIntoFile.generateAdhocTaskConfig();
+    Assert.assertEquals(adhocTaskConfig.getTaskType(), 
"SegmentGenerationAndPushTask");
+    Assert.assertEquals(adhocTaskConfig.getTaskName(), "myTask-1");
+    Assert.assertEquals(adhocTaskConfig.getTableName(), "baseballStats");
+    Assert.assertEquals(adhocTaskConfig.getTaskConfigs().size(), 6);
+    Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("taskName"), 
"myTask-1");
+    
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.className"),
+        "org.apache.pinot.plugin.filesystem.S3PinotFS");
+    
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.prop.accessKey"),
 "my-access-key");
+    
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.prop.secretKey"),
 "my-secret-key");
+    
Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("input.fs.prop.region"),
 "us-west-2");
+    Assert.assertEquals(adhocTaskConfig.getTaskConfigs().get("inputDirURI"), 
"s3://my-bucket/path/to/data/");
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 8425db6475..6fe183a128 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -88,6 +88,7 @@ import 
org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -137,6 +138,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected HelixManager _helixParticipantManager;
   protected PinotMetricsRegistry _metricsRegistry;
   protected ControllerMetrics _controllerMetrics;
+  protected SqlQueryExecutor _sqlQueryExecutor;
   // Can only be constructed after resource manager getting started
   protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
   protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
@@ -414,6 +416,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       LOGGER.info("Realtime tables with High Level consumers will NOT be 
supported");
       _realtimeSegmentsManager = null;
     }
+    _sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
@@ -461,6 +464,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
         bind(_leadControllerManager).to(LeadControllerManager.class);
         bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
+        bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
       }
     });
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 1c7895c0af..8dac12afbf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -42,6 +43,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.Utils;
@@ -51,11 +53,15 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.PinotSqlType;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +73,9 @@ public class PinotQueryResource {
   private static final CalciteSqlCompiler SQL_QUERY_COMPILER = new 
CalciteSqlCompiler();
   private static final Random RANDOM = new Random();
 
+  @Inject
+  SqlQueryExecutor _sqlQueryExecutor;
+
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
@@ -135,20 +144,46 @@ public class PinotQueryResource {
         queryOptions = requestJson.get("queryOptions").asText();
       }
       LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
-      return getQueryResponse(sqlQuery, traceEnabled, queryOptions, 
httpHeaders, CommonConstants.Broker.Request.SQL);
+      return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing post request", e);
       return QueryException.getException(QueryException.INTERNAL_ERROR, 
e).toString();
     }
   }
 
+  private String executeSqlQuery(@Context HttpHeaders httpHeaders, String 
sqlQuery,
+      String traceEnabled, String queryOptions)
+      throws Exception {
+    SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
+    PinotSqlType sqlType = 
CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
+    switch (sqlType) {
+      case DQL:
+        return getQueryResponse(sqlQuery, sqlNodeAndOptions.getSqlNode(), 
traceEnabled, queryOptions, httpHeaders,
+            CommonConstants.Broker.Request.SQL);
+      case DML:
+        Map<String, String> headers =
+            httpHeaders.getRequestHeaders().entrySet().stream().filter(entry 
-> !entry.getValue().isEmpty())
+                .map(entry -> Pair.of(entry.getKey(), entry.getValue().get(0)))
+                .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+        return _sqlQueryExecutor.executeDMLStatement(sqlNodeAndOptions, 
headers).toJsonString();
+      default:
+        throw new UnsupportedOperationException("Unsupported SQL type - " + 
sqlType);
+    }
+  }
+
   @GET
   @Path("sql")
   public String handleGetSql(@QueryParam("sql") String sqlQuery, 
@QueryParam("trace") String traceEnabled,
       @QueryParam("queryOptions") String queryOptions, @Context HttpHeaders 
httpHeaders) {
     try {
       LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
-      return getQueryResponse(sqlQuery, traceEnabled, queryOptions, 
httpHeaders, CommonConstants.Broker.Request.SQL);
+      SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
+      PinotSqlType sqlType = 
CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
+      if (sqlType == PinotSqlType.DQL) {
+        return getQueryResponse(sqlQuery, sqlNodeAndOptions.getSqlNode(), 
traceEnabled, queryOptions, httpHeaders,
+            CommonConstants.Broker.Request.SQL);
+      }
+      throw new UnsupportedOperationException("Unsupported SQL type - " + 
sqlType);
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing get request", e);
       return QueryException.getException(QueryException.INTERNAL_ERROR, 
e).toString();
@@ -157,13 +192,20 @@ public class PinotQueryResource {
 
   public String getQueryResponse(String query, String traceEnabled, String 
queryOptions, HttpHeaders httpHeaders,
       String querySyntax) {
+    return getQueryResponse(query, null, traceEnabled, queryOptions, 
httpHeaders, querySyntax);
+  }
+
+  public String getQueryResponse(String query, @Nullable SqlNode sqlNode, 
String traceEnabled, String queryOptions,
+      HttpHeaders httpHeaders, String querySyntax) {
     // Get resource table name.
     String tableName;
     try {
       String inputTableName;
       switch (querySyntax) {
         case CommonConstants.Broker.Request.SQL:
-          inputTableName = 
RequestUtils.getTableName(SQL_QUERY_COMPILER.compileToBrokerRequest(query).getPinotQuery());
+          inputTableName =
+              (sqlNode != null) ? 
RequestUtils.getTableName(CalciteSqlParser.compileSqlNodeToPinotQuery(sqlNode))
+                  : 
SQL_QUERY_COMPILER.compileToBrokerRequest(query).getQuerySource().getTableName();
           break;
         case CommonConstants.Broker.Request.PQL:
           inputTableName = 
PQL_QUERY_COMPILER.compileToBrokerRequest(query).getQuerySource().getTableName();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java
new file mode 100644
index 0000000000..459b026245
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor.sql;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.minion.MinionClient;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.helix.LeadControllerUtils;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.dml.DataManipulationStatement;
+import org.apache.pinot.sql.parsers.dml.DataManipulationStatementParser;
+
+
+/**
+ * SqlQueryExecutor executes all SQL queries including DQL, DML, DCL, DDL.
+ *
+ */
+public class SqlQueryExecutor {
+  private final String _controllerUrl;
+  private final HelixManager _helixManager;
+
+  /**
+   * Fetch the lead controller from helix, HA is not guaranteed.
+   * @param helixManager is used to query leader controller from helix.
+   */
+  public SqlQueryExecutor(HelixManager helixManager) {
+    _helixManager = helixManager;
+    _controllerUrl = null;
+  }
+
+  /**
+   * Recommended to provide the controller vip or service name for access.
+   * @param controllerUrl controller service name for sending minion task 
requests
+   */
+  public SqlQueryExecutor(String controllerUrl) {
+    _controllerUrl = controllerUrl;
+    _helixManager = null;
+  }
+
+  private static String getControllerBaseUrl(HelixManager helixManager) {
+    String instanceHostPort = 
LeadControllerUtils.getHelixClusterLeader(helixManager);
+    if (instanceHostPort == null) {
+      throw new RuntimeException("Unable to locate the leader pinot 
controller, please retry later...");
+    }
+    int index = instanceHostPort.lastIndexOf('_');
+    if (index < 0) {
+      throw new RuntimeException("Unable to parse pinot controller instance 
name for " + instanceHostPort);
+    }
+    String leaderHost = instanceHostPort.substring(0, index);
+    String leaderPort = instanceHostPort.substring(index + 1);
+    return "http://"; + leaderHost + ":" + leaderPort;
+  }
+
+  /**
+   * Execute DML Statement
+   *
+   * @param sqlNodeAndOptions Parsed DML object
+   * @param headers extra headers map for minion task submission
+   * @return BrokerResponse is the DML executed response
+   */
+  public BrokerResponse executeDMLStatement(SqlNodeAndOptions 
sqlNodeAndOptions,
+      @Nullable Map<String, String> headers) {
+    DataManipulationStatement statement = 
DataManipulationStatementParser.parse(sqlNodeAndOptions);
+    BrokerResponseNative result = new BrokerResponseNative();
+    switch (statement.getExecutionType()) {
+      case MINION:
+        AdhocTaskConfig taskConf = statement.generateAdhocTaskConfig();
+        try {
+          Map<String, String> tableToTaskIdMap = 
getMinionClient().executeTask(taskConf, headers);
+          List<Object[]> rows = new ArrayList<>();
+          tableToTaskIdMap.forEach((key, value) -> rows.add(new Object[]{key, 
value}));
+          result.setResultTable(new ResultTable(statement.getResultSchema(), 
rows));
+        } catch (IOException e) {
+          
result.setExceptions(ImmutableList.of(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e)));
+        }
+        break;
+      case HTTP:
+        try {
+          result.setResultTable(new ResultTable(statement.getResultSchema(), 
statement.execute()));
+        } catch (Exception e) {
+          
result.setExceptions(ImmutableList.of(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e)));
+        }
+        break;
+      default:
+        
result.setExceptions(ImmutableList.of(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
+            new UnsupportedOperationException("Unsupported statement - " + 
statement))));
+        break;
+    }
+    return result;
+  }
+
+  private MinionClient getMinionClient() {
+    if (_helixManager != null) {
+      return new MinionClient(getControllerBaseUrl(_helixManager));
+    }
+    return new MinionClient(_controllerUrl);
+  }
+}
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d3cebf1cd4..4d3933d351 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -472,4 +472,32 @@ public abstract class ClusterTest extends ControllerTest {
 
     return JsonUtils.stringToJsonNode(sendPostRequest(brokerBaseApiUrl + 
"/query/sql", payload.toString(), headers));
   }
+
+  /**
+   * Queries the controller's sql query endpoint (/query/sql)
+   */
+  protected JsonNode postQueryToController(String query)
+      throws Exception {
+    return postQueryToController(query, _controllerBaseApiUrl);
+  }
+
+  /**
+   * Queries the controller's sql query endpoint (/sql)
+   */
+  public static JsonNode postQueryToController(String query, String 
controllerBaseApiUrl)
+      throws Exception {
+    return postQueryToController(query, controllerBaseApiUrl, null);
+  }
+
+  /**
+   * Queries the controller's sql query endpoint (/sql)
+   */
+  public static JsonNode postQueryToController(String query, String 
controllerBaseApiUrl, Map<String, String> headers)
+      throws Exception {
+    ObjectNode payload = JsonUtils.newObjectNode();
+    payload.put("sql", query);
+    payload.put("queryOptions", "groupByMode=sql;responseFormat=sql");
+    return JsonUtils.stringToJsonNode(
+        sendPostRequest(controllerBaseApiUrl + "/sql", 
JsonUtils.objectToString(payload), headers));
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 011cd15d2c..074000b737 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -306,7 +306,7 @@ public abstract class BaseClusterIntegrationTestSet extends 
BaseClusterIntegrati
         // matching query
         "SELECT count(*) FROM mytable",
         // query that does not match any row
-        "SELECT count(*) FROM mytable where 
non_existing_column='non_existing_value",
+        "SELECT count(*) FROM mytable where 
non_existing_column='non_existing_value'",
         // query a non existing table
         "SELECT count(*) FROM mytable_foo"
     };
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
index 374bb2d617..7880d1b846 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -106,6 +107,51 @@ public class SegmentGenerationMinionClusterIntegrationTest 
extends BaseClusterIn
     assertEquals(result.get("numSegmentsQueried").asInt(), 7);
   }
 
+  @Test
+  public void testInsertIntoFromFileQueryToBroker()
+      throws Exception {
+    testInsertIntoFromFile("testInsertIntoFromFileQueryToBroker", 
"testInsertIntoFromFileQueryToBrokerTask", false);
+  }
+
+  @Test
+  public void testInsertIntoFromFileQueryToController()
+      throws Exception {
+    testInsertIntoFromFile("testInsertIntoFromFileQueryToController", 
"testInsertIntoFromFileQueryToControllerTask",
+        true);
+  }
+
+  private void testInsertIntoFromFile(String tableName, String taskName, 
boolean queryController)
+      throws Exception {
+    addSchemaAndTableConfig(tableName);
+
+    File inputDir = new File(_tempDir, tableName);
+    int rowCnt = prepInputFiles(inputDir, 7, 10);
+    assertEquals(rowCnt, 70);
+
+    String insertFileStatement =
+        String.format("INSERT INTO %s FROM FILE '%s' OPTION(taskName=%s)", 
tableName, inputDir.getAbsolutePath(),
+            taskName);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        if (getTotalDocs(tableName) < rowCnt) {
+          JsonNode response = queryController ? 
postQueryToController(insertFileStatement, _controllerBaseApiUrl)
+              : postQuery(insertFileStatement, _brokerBaseApiUrl);
+          
Assert.assertEquals(response.get("resultTable").get("rows").get(0).get(0).asText(),
+              tableName + "_OFFLINE");
+          
Assert.assertEquals(response.get("resultTable").get("rows").get(0).get(1).asText(),
+              "Task_SegmentGenerationAndPushTask_" + taskName);
+        }
+        return getTotalDocs(tableName) == rowCnt;
+      } catch (Exception e) {
+        LOGGER.error("Failed to get expected totalDocs: " + rowCnt, e);
+        return false;
+      }
+    }, 5000L, 600_000L, "Failed to load " + rowCnt + " documents", true);
+    JsonNode result = postQuery("SELECT COUNT(*) FROM " + tableName, 
_brokerBaseApiUrl);
+    // One segment per file.
+    assertEquals(result.get("numSegmentsQueried").asInt(), 7);
+  }
+
   private void addSchemaAndTableConfig(String tableName)
       throws Exception {
     addSchema(new 
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("id", 
FieldSpec.DataType.INT)
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index c908ecf314..5f6727ad2b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -231,6 +231,8 @@ public class CommonConstants {
 
     public static final String DISABLE_GROOVY = 
"pinot.broker.disable.query.groovy";
 
+    public static final String CONTROLLER_URL = "pinot.broker.controller.url";
+
     public static class Request {
       public static final String PQL = "pql";
       public static final String SQL = "sql";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to