LakshSingla commented on code in PR #12845: URL: https://github.com/apache/druid/pull/12845#discussion_r935133698
########## sql/src/main/java/org/apache/druid/sql/SqlExecutionReporter.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.server.QueryStats; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Side-car class which reports logs and metrics for an + * {@link HttpStatement}. This separate class cleanly separates the logic + * for running a query from the logic for reporting on that run. A query + * can end either with a success or error. This object is created in + * the request thread, with the remaining method called either from the + * request or response thread, but not both. + */ +public class SqlExecutionReporter +{ + private static final Logger log = new Logger(SqlExecutionReporter.class); + + private final AbstractStatement stmt; + private final String remoteAddress; + private final long startMs; + private final long startNs; + private Throwable e; + private long bytesWritten; + + public SqlExecutionReporter( + final AbstractStatement stmt, + final String remoteAddress + ) + { + this.stmt = stmt; + this.remoteAddress = remoteAddress; + this.startMs = System.currentTimeMillis(); + this.startNs = System.nanoTime(); + } + + public void failed(Throwable e) + { + this.e = e; + } + + public void succeeded(final long bytesWritten) + { + this.bytesWritten = bytesWritten; + } + + public void emit() + { + final boolean success = e == null; + final long queryTimeNs = System.nanoTime() - startNs; + + ServiceEmitter emitter = stmt.sqlToolbox.emitter; + PlannerContext plannerContext = stmt.plannerContext; + try { + ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + if (plannerContext != null) { + metricBuilder.setDimension("id", plannerContext.getSqlQueryId()); + metricBuilder.setDimension("nativeQueryIds", plannerContext.getNativeQueryIds().toString()); + } + if (stmt.resourceActions != null) { + metricBuilder.setDimension( + "dataSource", + stmt.resourceActions + .stream() + .map(action -> action.getResource().getName()) + .collect(Collectors.toList()) + .toString() + ); + } + metricBuilder.setDimension("remoteAddress", StringUtils.nullToEmptyNonDruidDataString(remoteAddress)); + metricBuilder.setDimension("success", String.valueOf(success)); + emitter.emit(metricBuilder.build("sqlQuery/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs))); + if (bytesWritten >= 0) { + emitter.emit(metricBuilder.build("sqlQuery/bytes", bytesWritten)); + } + + final Map<String, Object> statsMap = new LinkedHashMap<>(); + statsMap.put("sqlQuery/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs)); + statsMap.put("sqlQuery/bytes", bytesWritten); + statsMap.put("success", success); + QueryContext queryContext; + if (plannerContext == null) { + queryContext = stmt.queryPlus.context(); + } else { + statsMap.put("identity", plannerContext.getAuthenticationResult().getIdentity()); + queryContext = stmt.queryPlus.context(); + queryContext.addSystemParam("nativeQueryIds", plannerContext.getNativeQueryIds().toString()); + } + final Map<String, Object> context = queryContext.getMergedParams(); + statsMap.put("context", context); + if (e != null) { + statsMap.put("exception", e.toString()); + + if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) { + statsMap.put("interrupted", true); + statsMap.put("reason", e.toString()); Review Comment: I am unfamiliar with reporting the metrics, but is there an advantage of duplicating `exception` as `reason` in case the query is interrupted? Also, in case the query is not interrupted, then should we have `statsMap.put("interrupted", false);` for symmetry? ########## sql/src/main/java/org/apache/druid/sql/DirectStatement.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.sql.SqlLifecycleManager.Cancellable; +import org.apache.druid.sql.calcite.planner.DruidPlanner; +import org.apache.druid.sql.calcite.planner.PlannerResult; +import org.apache.druid.sql.calcite.planner.PrepareResult; + +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Lifecycle for direct SQL statement execution, which means that the query + * is planned and executed in a single step, with no "prepare" step. + * Callers need call only: + * <ul> + * <li>{@link #execute()} to execute the query. The caller must close + * the returned {@code Sequence}.</li> + * <li>{@link #close()} to report metrics, or {@link #closeQuietly()} + * otherwise.</li> + * </ul> + * <p> + * The {@link #cancel()} method may be called from any thread and cancels + * the query. + * <p> + * All other methods are optional and are generally for introspection. + * <p> + * The class supports two threading models. In the simple case, the same + * thread creates this object and executes the query. In the split model, + * a request thread creates this object and plans the query. A separate + * response thread consumes results and performs any desired logging, etc. + * The object is transferred between threads, with no overlapping access. Review Comment: > The object is transferred between threads, with no overlapping access. Newbie question: Does this mean that only a single thread holds ownership of an object of this class at a time? If so, then `synchronized` methods and locks are not required. Also, if the object can be seen from different threads, then why isn't `prepareResult` and `plannerResult` declared volatile? ########## sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java: ########## @@ -66,6 +66,25 @@ public SqlQueryPlus(final String sql, final AuthenticationResult authResult) this(sql, (QueryContext) null, null, authResult); } + public SqlQueryPlus( Review Comment: nit: Should we have a Builder for this class considering that there are multiple constructors for it filling in `null` for different parameters? ########## sql/src/main/java/org/apache/druid/sql/AbstractStatement.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql; + +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.ValidationException; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.planner.DruidPlanner; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.PlannerResult; + +import java.io.Closeable; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +/** + * Represents a SQL statement either for preparation or execution. Review Comment: > Closing the statement emits logs and metrics for the statement. Is there any case when we want to emit logs/metrics beforehand, for example, an exception while calling the public methods of the class? If so, is the above statement a recommendation for implementations or a binding contract? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
