gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221683930
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
 ##########
 @@ -24,50 +24,89 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.filter.Filter;
 import org.joda.time.Interval;
 
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
- * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with 
RuntimeException on access not from the
- * thread where it was constructed. To "transfer" DefaultQueryMetrics from one 
thread to another {@link #ownerThread}
- * field should be updated.
+ * A basic thread-safe implementation of the {@link QueryMetrics} interface.
+ *
+ * <p><b>Note for inheritence:</b> Subclass should override {@link 
#makeCopy()} method to return
+ * an instance of that subclass. See {@link 
org.apache.druid.query.groupby.DefaultGroupByQueryMetrics}
+ * for an example.
  */
 public class DefaultQueryMetrics<QueryType extends Query<?>> implements 
QueryMetrics<QueryType>
 {
-  protected final ObjectMapper jsonMapper;
-  protected final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
-  protected final Map<String, Number> metrics = new HashMap<>();
+  private static final EmittingLogger log = new 
EmittingLogger(DefaultQueryMetrics.class);
 
-  /** Non final to give subclasses ability to reassign it. */
-  protected Thread ownerThread = Thread.currentThread();
+  protected final ObjectMapper jsonMapper;
+  protected final Object lock = new Object();
+  @GuardedBy("lock") private final Map<String, String> singleValueDims = new 
HashMap<>();
+  @GuardedBy("lock") private final Map<String, String[]> multiValueDims = new 
HashMap<>();
+  @GuardedBy("lock") private final Map<String, Number> metrics = new 
HashMap<>();
 
   public DefaultQueryMetrics(ObjectMapper jsonMapper)
   {
     this.jsonMapper = jsonMapper;
   }
 
-  protected void checkModifiedFromOwnerThread()
+  // copy constructor, used by makeCopy()
+  public DefaultQueryMetrics(DefaultQueryMetrics that)
   {
-    if (Thread.currentThread() != ownerThread) {
-      throw new IllegalStateException(
-          "DefaultQueryMetrics must not be modified from multiple threads. If 
it is needed to gather dimension or "
-          + "metric information from multiple threads or from an async thread, 
this information should explicitly be "
-          + "passed between threads (e. g. using Futures), or this 
DefaultQueryMetrics's ownerThread should be "
-          + "reassigned explicitly");
-    }
+    this.jsonMapper = that.jsonMapper;
+    this.singleValueDims.putAll(that.singleValueDims);
+    this.multiValueDims.putAll(that.multiValueDims);
+    this.metrics.putAll(that.metrics);
   }
 
   protected void setDimension(String dimension, String value)
   {
-    checkModifiedFromOwnerThread();
-    builder.setDimension(dimension, value);
+    synchronized (lock) {
+      String oldValue = singleValueDims.put(dimension, value);
+      if (oldValue != null && !oldValue.equals(value)) {
+        
handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), 
dimension);
+      }
+    }
+  }
+
+  protected void setDimensions(String dimension, String[] values)
+  {
+    synchronized (lock) {
+      String[] oldValues = multiValueDims.put(dimension, values);
+      if (oldValues != null && !Arrays.equals(oldValues, values)) {
+        
handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), 
dimension);
+      }
+    }
+  }
+
+  protected QueryMetrics<QueryType> reportMetric(String metricName, Number 
value)
+  {
+    synchronized (lock) {
+      Number oldValue = metrics.put(metricName, value);
+      if (oldValue != null && !oldValue.equals(value)) {
+        
handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), 
metricName);
+      }
+      return this;
+    }
+  }
+
+  private void handleIllegalModification(String queryId, String entryName)
+  {
+    Exception e = new Exception("stack trace");
+    log.makeAlert(e, "\"%s\" in QueryMetrics got modified to another value", 
entryName)
 
 Review comment:
   I didn't throw exception here because I think bugs in metrics handling 
shouldn't fail the whole query. It would allow users continue to query with 
only "broken" metrics, which may be acceptable in some use cases. I admit that 
this may be controversial and would like to hear other committer's opinions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to