[
https://issues.apache.org/jira/browse/KYLIN-3434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579536#comment-16579536
]
ASF GitHub Bot commented on KYLIN-3434:
---------------------------------------
shaofengshi closed pull request #160: KYLIN-3434 Support prepare statement in
Kylin server side
URL: https://github.com/apache/kylin/pull/160
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3ae6c2d48b..f4aad5e6bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1377,6 +1377,14 @@ public String getQueryAccessController() {
return getOptional("kylin.query.access-controller", null);
}
+ public int getQueryMaxCacheStatementNum() {
+ return
Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num",
String.valueOf(50000)));
+ }
+
+ public int getQueryMaxCacheStatementInstancePerKey() {
+ return
Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num-per-key",
String.valueOf(50)));
+ }
+
public int getDimCountDistinctMaxCardinality() {
return
Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct",
"5000000"));
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3f4c576053..8d395af6a7 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -264,6 +264,7 @@ public CubeInstance updateCubeStatus(CubeInstance cube,
RealizationStatusEnum ne
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setStatus(newStatus);
+ ProjectManager.getInstance(config).touchProject(cube.getProject());
return updateCube(update);
}
}
diff --git
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 5122fd8401..4f1de33c2b 100644
---
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -355,6 +355,18 @@ public void removeExtFilterFromProject(String filterName,
String projectName) th
save(projectInstance);
}
}
+
+ /**
+ * change the last project modify time
+ * @param projectName
+ * @throws IOException
+ */
+ public void touchProject(String projectName) throws IOException {
+ try (AutoLock lock = prjMapLock.lockForWrite()) {
+ ProjectInstance projectInstance = getProject(projectName);
+ save(projectInstance);
+ }
+ }
private ProjectInstance save(ProjectInstance prj) throws IOException {
crud.save(prj);
diff --git a/pom.xml b/pom.xml
index 854fd3b39e..1888a7f5e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
<commons-upload.version>1.3.3</commons-upload.version>
<commons-math3.version>3.1.1</commons-math3.version>
<commons-collections.version>3.2.2</commons-collections.version>
+ <commons-pool.version>2.5.0</commons-pool.version>
<!-- Calcite deps, keep compatible with calcite.version -->
<jackson.version>2.9.5</jackson.version>
@@ -507,6 +508,11 @@
<version>${commons-collections.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>${commons-pool.version}</version>
+ </dependency>
<!-- HBase2 dependencies -->
<dependency>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 455ae786f4..baa6433f7e 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -85,6 +85,10 @@
<groupId>net.sf.supercsv</groupId>
<artifactId>super-csv</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
index 97a486326a..48e382ae31 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
@@ -28,6 +28,7 @@
*
*/
public class PrepareSqlRequest extends SQLRequest {
+ private boolean enableStatementCache = true;
public PrepareSqlRequest() {
super();
@@ -43,6 +44,14 @@ public void setParams(StateParam[] params) {
this.params = params;
}
+ public boolean isEnableStatementCache() {
+ return enableStatementCache;
+ }
+
+ public void setEnableStatementCache(boolean enableStatementCache) {
+ this.enableStatementCache = enableStatementCache;
+ }
+
public static class StateParam implements Serializable {
private String className;
private String value;
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4e3fe071cb..f195e74806 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,15 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.PostConstruct;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+
import org.apache.calcite.avatica.ColumnMetaData.Rep;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalcitePrepare;
@@ -55,6 +60,11 @@
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContextFacade;
@@ -113,10 +123,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-
/**
* @author xduo
*/
@@ -144,11 +150,26 @@
@Autowired
private AclEvaluate aclEvaluate;
+ private GenericKeyedObjectPool<PreparedContextKey, PreparedContext>
preparedContextPool;
+
public QueryService() {
queryStore = ResourceStore.getStore(getConfig());
+ preparedContextPool = createPreparedContextPool();
badQueryDetector.start();
}
+ private GenericKeyedObjectPool<PreparedContextKey, PreparedContext>
createPreparedContextPool() {
+ PreparedContextFactory factory = new PreparedContextFactory();
+ KylinConfig kylinConfig = getConfig();
+ GenericKeyedObjectPoolConfig config = new
GenericKeyedObjectPoolConfig();
+
config.setMaxTotalPerKey(kylinConfig.getQueryMaxCacheStatementInstancePerKey());
+ config.setMaxTotal(kylinConfig.getQueryMaxCacheStatementNum());
+ config.setBlockWhenExhausted(false);
+ config.setMinEvictableIdleTimeMillis(10 * 60 * 1000L); // cached
statement will be evict if idle for 10 minutes
+ GenericKeyedObjectPool<PreparedContextKey, PreparedContext> pool = new
GenericKeyedObjectPool<>(factory, config);
+ return pool;
+ }
+
protected static void close(ResultSet resultSet, Statement stat,
Connection conn) {
OLAPContext.clearParameter();
DBUtils.closeQuietly(resultSet);
@@ -502,10 +523,13 @@ public SQLResponse searchQueryInCache(SQLRequest
sqlRequest) {
private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws
Exception {
Connection conn = null;
-
+ boolean isPrepareRequest = isPrepareStatementWithParams(sqlRequest);
+ boolean borrowPrepareContext = false;
+ PreparedContextKey preparedContextKey = null;
+ PreparedContext preparedContext = null;
+
try {
conn = QueryConnection.getConnection(sqlRequest.getProject());
-
String userInfo =
SecurityContextHolder.getContext().getAuthentication().getName();
QueryContext context = QueryContextFacade.current();
context.setUsername(userInfo);
@@ -539,11 +563,45 @@ private SQLResponse queryWithSqlMassage(SQLRequest
sqlRequest) throws Exception
OLAPContext.setParameters(parameters);
// force clear the query context before a new query
OLAPContext.clearThreadLocalContexts();
-
- return execute(correctedSql, sqlRequest, conn);
+
+ // special case for prepare query.
+ List<List<String>> results = Lists.newArrayList();
+ List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+ if (BackdoorToggles.getPrepareOnly()) {
+ return getPrepareOnlySqlResponse(correctedSql, conn, false,
results, columnMetas);
+ }
+ if (!isPrepareRequest) {
+ return executeRequest(correctedSql, sqlRequest, conn);
+ } else {
+ long prjLastModifyTime =
getProjectManager().getProject(sqlRequest.getProject()).getLastModified();
+ preparedContextKey = new
PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql);
+ PrepareSqlRequest prepareSqlRequest = (PrepareSqlRequest)
sqlRequest;
+ if (prepareSqlRequest.isEnableStatementCache()) {
+ try {
+ preparedContext =
preparedContextPool.borrowObject(preparedContextKey);
+ borrowPrepareContext = true;
+ } catch (NoSuchElementException noElementException) {
+ borrowPrepareContext = false;
+ preparedContext =
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+ }
+ for(OLAPContext olapContext :
preparedContext.olapContexts) {
+ OLAPContext.registerContext(olapContext);
+ }
+ } else {
+ preparedContext =
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+ }
+ return executePrepareRequest(correctedSql, prepareSqlRequest,
preparedContext);
+ }
} finally {
DBUtils.closeQuietly(conn);
+ if (preparedContext != null) {
+ if (borrowPrepareContext) {
+ preparedContextPool.returnObject(preparedContextKey,
preparedContext);
+ } else {
+ preparedContext.close();
+ }
+ }
}
}
@@ -783,83 +841,100 @@ protected void processStatementAttr(Statement s,
SQLRequest sqlRequest) throws S
* @return
* @throws Exception
*/
- private SQLResponse execute(String correctedSql, SQLRequest sqlRequest,
Connection conn) throws Exception {
+ private SQLResponse executeRequest(String correctedSql, SQLRequest
sqlRequest, Connection conn) throws Exception {
Statement stat = null;
ResultSet resultSet = null;
boolean isPushDown = false;
- List<List<String>> results = Lists.newArrayList();
- List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
-
+ Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
try {
+ stat = conn.createStatement();
+ processStatementAttr(stat, sqlRequest);
+ resultSet = stat.executeQuery(correctedSql);
- // special case for prepare query.
- if (BackdoorToggles.getPrepareOnly()) {
- return getPrepareOnlySqlResponse(correctedSql, conn,
isPushDown, results, columnMetas);
- }
+ r = createResponseFromResultSet(resultSet);
- if (isPrepareStatementWithParams(sqlRequest)) {
+ } catch (SQLException sqlException) {
+ r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
+ if (r == null)
+ throw sqlException;
- stat = conn.prepareStatement(correctedSql); // to be closed in
the finally
- PreparedStatement prepared = (PreparedStatement) stat;
- processStatementAttr(prepared, sqlRequest);
- for (int i = 0; i < ((PrepareSqlRequest)
sqlRequest).getParams().length; i++) {
- setParam(prepared, i + 1, ((PrepareSqlRequest)
sqlRequest).getParams()[i]);
- }
- resultSet = prepared.executeQuery();
- } else {
- stat = conn.createStatement();
- processStatementAttr(stat, sqlRequest);
- resultSet = stat.executeQuery(correctedSql);
- }
+ isPushDown = true;
+ } finally {
+ close(resultSet, stat, null); //conn is passed in, not my duty to
close
+ }
- ResultSetMetaData metaData = resultSet.getMetaData();
- int columnCount = metaData.getColumnCount();
-
- // Fill in selected column meta
- for (int i = 1; i <= columnCount; ++i) {
- columnMetas.add(new
SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i),
- metaData.isSearchable(i), metaData.isCurrency(i),
metaData.isNullable(i), metaData.isSigned(i),
- metaData.getColumnDisplaySize(i),
metaData.getColumnLabel(i), metaData.getColumnName(i),
- metaData.getSchemaName(i), metaData.getCatalogName(i),
metaData.getTableName(i),
- metaData.getPrecision(i), metaData.getScale(i),
metaData.getColumnType(i),
- metaData.getColumnTypeName(i), metaData.isReadOnly(i),
metaData.isWritable(i),
- metaData.isDefinitelyWritable(i)));
- }
+ return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+ }
- // fill in results
- while (resultSet.next()) {
- List<String> oneRow =
Lists.newArrayListWithCapacity(columnCount);
- for (int i = 0; i < columnCount; i++) {
- oneRow.add((resultSet.getString(i + 1)));
- }
+ private SQLResponse executePrepareRequest(String correctedSql,
PrepareSqlRequest sqlRequest, PreparedContext preparedContext)
+ throws Exception {
+ ResultSet resultSet = null;
+ boolean isPushDown = false;
- results.add(oneRow);
+ Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
+ Connection conn = preparedContext.conn;
+ try {
+ PreparedStatement preparedStatement =
preparedContext.preparedStatement;
+ processStatementAttr(preparedStatement, sqlRequest);
+ for (int i = 0; i < sqlRequest.getParams().length; i++) {
+ setParam(preparedStatement, i + 1,
(sqlRequest.getParams()[i]));
}
-
+ resultSet = preparedStatement.executeQuery();
+ r = createResponseFromResultSet(resultSet);
} catch (SQLException sqlException) {
- Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
- try {
- r =
PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql,
conn.getSchema(),
- sqlException, BackdoorToggles.getPrepareOnly());
- } catch (Exception e2) {
- logger.error("pushdown engine failed current query too", e2);
- //exception in pushdown, throw it instead of exception in
calcite
- throw e2;
- }
-
+ r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
if (r == null)
throw sqlException;
isPushDown = true;
- results = r.getFirst();
- columnMetas = r.getSecond();
-
} finally {
- close(resultSet, stat, null); //conn is passed in, not my duty to
close
+ DBUtils.closeQuietly(resultSet);
}
- return buildSqlResponse(isPushDown, results, columnMetas);
+ return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+ }
+
+ private Pair<List<List<String>>, List<SelectedColumnMeta>>
pushDownQuery(SQLRequest sqlRequest, String correctedSql, Connection conn,
SQLException sqlException) throws Exception{
+ try {
+ return
PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql,
conn.getSchema(),
+ sqlException, BackdoorToggles.getPrepareOnly());
+ } catch (Exception e2) {
+ logger.error("pushdown engine failed current query too", e2);
+ //exception in pushdown, throw it instead of exception in calcite
+ throw e2;
+ }
+ }
+
+ private Pair<List<List<String>>, List<SelectedColumnMeta>>
createResponseFromResultSet(ResultSet resultSet)
+ throws Exception {
+ List<List<String>> results = Lists.newArrayList();
+ List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ int columnCount = metaData.getColumnCount();
+
+ // Fill in selected column meta
+ for (int i = 1; i <= columnCount; ++i) {
+ columnMetas.add(new
SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i),
metaData
+ .isSearchable(i), metaData.isCurrency(i),
metaData.isNullable(i), metaData.isSigned(i), metaData
+ .getColumnDisplaySize(i), metaData.getColumnLabel(i),
metaData.getColumnName(i), metaData
+ .getSchemaName(i), metaData.getCatalogName(i),
metaData.getTableName(i), metaData.getPrecision(i),
+ metaData.getScale(i), metaData.getColumnType(i),
metaData.getColumnTypeName(i), metaData
+ .isReadOnly(i), metaData.isWritable(i),
metaData.isDefinitelyWritable(i)));
+ }
+
+ // fill in results
+ while (resultSet.next()) {
+ List<String> oneRow = Lists.newArrayListWithCapacity(columnCount);
+ for (int i = 0; i < columnCount; i++) {
+ oneRow.add((resultSet.getString(i + 1)));
+ }
+
+ results.add(oneRow);
+ }
+
+ return new Pair<>(results, columnMetas);
}
protected String makeErrorMsgUserFriendly(Throwable e) {
@@ -1044,6 +1119,13 @@ public void setCacheManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
+ private static PreparedContext createPreparedContext(String project,
String sql) throws Exception{
+ Connection conn = QueryConnection.getConnection(project);
+ PreparedStatement preparedStatement = conn.prepareStatement(sql);
+ Collection<OLAPContext> olapContexts =
OLAPContext.getThreadLocalContexts();
+ return new PreparedContext(conn, preparedStatement, olapContexts);
+ }
+
private static class QueryRecordSerializer implements
Serializer<QueryRecord> {
private static final QueryRecordSerializer serializer = new
QueryRecordSerializer();
@@ -1069,6 +1151,86 @@ public QueryRecord deserialize(DataInputStream in)
throws IOException {
}
}
+ private static class PreparedContextFactory extends
+ BaseKeyedPooledObjectFactory<PreparedContextKey, PreparedContext> {
+
+ @Override
+ public PreparedContext create(PreparedContextKey key) throws Exception
{
+ return createPreparedContext(key.project, key.sql);
+ }
+
+ @Override
+ public PooledObject<PreparedContext> wrap(PreparedContext value) {
+ return new DefaultPooledObject<>(value);
+ }
+
+ @Override
+ public void destroyObject(final PreparedContextKey key, final
PooledObject<PreparedContext> p) {
+ PreparedContext cachedContext = p.getObject();
+ cachedContext.close();
+ }
+
+ @Override
+ public boolean validateObject(final PreparedContextKey key, final
PooledObject<PreparedContext> p) {
+ return true;
+ }
+ }
+
+ private static class PreparedContextKey {
+ private String project;
+ private long prjLastModifyTime;
+ private String sql;
+
+ public PreparedContextKey(String project, long prjLastModifyTime,
String sql) {
+ this.project = project;
+ this.prjLastModifyTime = prjLastModifyTime;
+ this.sql = sql;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PreparedContextKey that = (PreparedContextKey) o;
+
+ if (prjLastModifyTime != that.prjLastModifyTime) return false;
+ if (project != null ? !project.equals(that.project) : that.project
!= null) return false;
+ return sql != null ? sql.equals(that.sql) : that.sql == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = project != null ? project.hashCode() : 0;
+ result = 31 * result + (int) (prjLastModifyTime ^
(prjLastModifyTime >>> 32));
+ result = 31 * result + (sql != null ? sql.hashCode() : 0);
+ return result;
+ }
+ }
+
+ private static class PreparedContext {
+ private Connection conn;
+ private PreparedStatement preparedStatement;
+ private Collection<OLAPContext> olapContexts;
+
+ public PreparedContext(Connection conn, PreparedStatement
preparedStatement,
+ Collection<OLAPContext> olapContexts) {
+ this.conn = conn;
+ this.preparedStatement = preparedStatement;
+ this.olapContexts = olapContexts;
+ }
+
+ public void close() {
+ if (conn != null) {
+ DBUtils.closeQuietly(conn);
+ }
+ if (preparedStatement != null) {
+ DBUtils.closeQuietly(preparedStatement);
+ }
+ }
+ }
+
}
@SuppressWarnings("serial")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Support prepare statement in Kylin server side
> ----------------------------------------------
>
> Key: KYLIN-3434
> URL: https://issues.apache.org/jira/browse/KYLIN-3434
> Project: Kylin
> Issue Type: Improvement
> Components: Query Engine
> Reporter: Ma Gang
> Assignee: Ma Gang
> Priority: Major
>
> Kylin use calcite as sql engine, when a sql comes to Kylin server, it
> requires to be parsed, optimized, code gen, and then query Kylin's cube
> storage, the previous 3 steps often take 50-150 ms to complete(depends on the
> complexity of the sql). If we support to cache the parsed result in Kylin
> server, the 3 steps will be saved.
> The idea is to cache calcite's PreparedStatement object and related
> OLAPContexts in the server side, when the prepare request comes with the same
> sql, reuse the PreparedStatement to do the execution. Since the
> PreparedStatement is not thread safe, so I planned to use ObjectPool to cache
> the PreparedStatement.(use apache commons-pool lib)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)