Github user kkhatua commented on a diff in the pull request:
https://github.com/apache/drill/pull/1077#discussion_r159567549
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
---
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
+import org.apache.drill.exec.server.options.QueryOptionManager;
+import org.apache.drill.exec.util.ImpersonationUtil;
+
+import javax.annotation.Nullable;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * DRILL-5068: Add a new system table for completed profiles
+ */
+public class ProfileIterator implements Iterator<Object> {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ProfileIterator.class);
+
+ private final QueryProfileStoreContext profileStoreContext;
+ private final Iterator<?> itr;
+ private final String queryingUsername;
+ private final boolean isAdmin;
+
+ public ProfileIterator(FragmentContext context) {
+ //Grab profile Store Context
+ profileStoreContext = context
+ .getDrillbitContext()
+ .getProfileStoreContext();
+
+ queryingUsername = context.getQueryUserName();
+ isAdmin = hasAdminPrivileges(context.getQueryContext());
+
+ itr = iterateProfileInfo(context);
+ }
+
+ private boolean hasAdminPrivileges(QueryContext queryContext) {
+ if (queryContext == null) {
+ return false;
+ }
+ QueryOptionManager options = queryContext.getOptions();
+ if (queryContext.isUserAuthenticationEnabled() &&
+ !ImpersonationUtil.hasAdminPrivileges(
+ queryContext.getQueryUserName(),
+ ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
+
ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options))) {
+ return false;
+ }
+
+ //Passed checks
+ return true;
+ }
+
+ private Iterator<ProfileInfo> iterateProfileInfo(FragmentContext
context) {
+ try {
+ PersistentStore<UserBitShared.QueryProfile> profiles =
profileStoreContext.getCompletedProfileStore();
+
+ return transform(getAuthorizedProfiles(profiles.getAll(),
queryingUsername, isAdmin));
+
+ } catch (Exception e) {
+ logger.error(String.format("Unable to get persistence store: %s, ",
profileStoreContext.getProfileStoreConfig().getName(), e));
+ return Iterators.singletonIterator(ProfileInfo.getDefault());
+ }
+ }
+
+ //Returns an iterator for authorized profiles
+ private Iterator<Entry<String, QueryProfile>> getAuthorizedProfiles (
+ Iterator<Entry<String, QueryProfile>> allProfiles, String username,
boolean isAdministrator) {
+ if (isAdministrator) {
+ return allProfiles;
+ }
+
+ List<Entry<String, QueryProfile>> authorizedProfiles = new
LinkedList<Entry<String, QueryProfile>>();
+ while (allProfiles.hasNext()) {
+ Entry<String, QueryProfile> profileKVPair = allProfiles.next();
+ //Check if user matches
+ if (profileKVPair.getValue().getUser().equals(username)) {
+ authorizedProfiles.add(profileKVPair);
+ }
+ }
+ return authorizedProfiles.iterator();
+ }
+
+ /**
+ * Iterating persistentStore as a iterator of {@link
org.apache.drill.exec.store.sys.ProfileIterator.ProfileInfo}.
+ */
+ private Iterator<ProfileInfo> transform(Iterator<Map.Entry<String,
UserBitShared.QueryProfile>> all) {
+ return Iterators.transform(all, new Function<Map.Entry<String,
UserBitShared.QueryProfile>, ProfileInfo>() {
+ @Nullable
+ @Override
+ public ProfileInfo apply(@Nullable Map.Entry<String,
UserBitShared.QueryProfile> input) {
+ if (input == null || input.getValue() == null) {
+ return ProfileInfo.getDefault();
+ }
+
+ //Constructing ProfileInfo
+ final String queryID = input.getKey();
+ final QueryProfile profile = input.getValue();
+ //For cases where query was never queued
+ final long assumedQueueEndTime = profile.getQueueWaitEnd()> 0 ?
profile.getQueueWaitEnd() : profile.getPlanEnd();
+ return new ProfileInfo(
+ queryID,
+ new Timestamp(profile.getStart()),
+ profile.getForeman().getAddress(),
+ profile.getTotalFragments(),
+ profile.getUser(),
+ profile.getQueueName(),
+ computeDuration(profile.getStart(), profile.getPlanEnd()),
+ computeDuration(profile.getPlanEnd(), assumedQueueEndTime),
+ computeDuration(assumedQueueEndTime, profile.getEnd()),
+ profile.getState().name(),
+ profile.getQuery()
+ );
+ }
+ });
+ }
+
+ protected long computeDuration(long startTime, long endTime) {
+ if (endTime > startTime && startTime > 0) {
+ return (endTime - startTime);
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itr.hasNext();
+ }
+
+ @Override
+ public Object next() {
+ return itr.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static class ProfileInfo {
+ private static final String UnknownValue = "UNKNOWN";
--- End diff --
+1
---