klcopp commented on a change in pull request #2563:
URL: https://github.com/apache/hive/pull/2563#discussion_r686691938
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
Review comment:
As discussed offline, we will also collect input ReadEntities and update
metrics based on those
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]