klcopp commented on a change in pull request #2563:
URL: https://github.com/apache/hive/pull/2563#discussion_r685133970
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
Review comment:
technically not, but since it's a singleton the instance is created no
matter what, right?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -219,8 +226,33 @@ public static void mergeDeltaFilesStats(AcidDirectory dir,
long checkThresholdIn
logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas,
numSmallDeltas);
String path = getRelPath(dir);
- newDeltaFilesStats(numObsoleteDeltas, numDeltas, numSmallDeltas)
- .forEach((type, cnt) -> deltaFilesStats.computeIfAbsent(type, v -> new
HashMap<>()).put(path, cnt));
+
+ filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold,
deltaFilesStats, path, maxCacheSize);
+ filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas,
obsoleteDeltasThreshold, deltaFilesStats,
+ path, maxCacheSize);
+ filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas,
deltasThreshold, deltaFilesStats,
+ path, maxCacheSize);
+ }
+
+ /**
+ * Add partition and delta count to deltaFilesStats if the delta count is
over the recording threshold and it is in
+ * the top {@link HiveConf.ConfVars#HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE}
deltas.
+ */
+ private static void filterAndAddToDeltaFilesStats(DeltaFilesMetricType type,
int deltaCount, int deltasThreshold,
+ EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>>
deltaFilesStats, String path, int maxCacheSize) {
+ if (deltaCount > deltasThreshold) {
+ Queue<Pair<String,Integer>> pairQueue = deltaFilesStats.get(type);
+ if (pairQueue != null && pairQueue.size() == maxCacheSize) {
+ Pair<String, Integer> lowest = pairQueue.peek();
+ if (lowest != null && deltaCount > lowest.getValue()) {
+ pairQueue.poll();
+ }
+ }
+ if (pairQueue == null || pairQueue.size() < maxCacheSize) {
+ deltaFilesStats.computeIfAbsent(type,
+ v -> (new PriorityBlockingQueue<>(maxCacheSize,
getComparator()))).add(Pair.of(path, deltaCount));
Review comment:
No, I can use a PriorityQueue instead
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
Review comment:
I see what you mean, but, I'm not sure how to update the cache then. For
example, my_table had 200 deltas, then compaction happened and now it has 0.
- before this change: there would be a counter with my_table=200, then after
compaction a counter with my_table/my_partition=0 and we would invalidate
my_table from the cache and not re-add.
- after this change: there would be a counter with my_table=200, then after
compaction there would be no counter afterwards (because 0 < min threshold, and
because my_table probably won't be in the topN tables)
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
Review comment:
I guess the issue is: we don't know if a partition is missing from the
counters because the query didn't touch it, or because the number of deltas has
decreased
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -267,32 +299,26 @@ private static String getRelPath(AcidUtils.Directory
directory) {
directory.getPath().getName();
}
- private static EnumMap<DeltaFilesMetricType, Integer> newDeltaFilesStats(int
numObsoleteDeltas, int numDeltas, int numSmallDeltas) {
- return new EnumMap<DeltaFilesMetricType,
Integer>(DeltaFilesMetricType.class) {{
- put(NUM_OBSOLETE_DELTAS, numObsoleteDeltas);
- put(NUM_DELTAS, numDeltas);
- put(NUM_SMALL_DELTAS, numSmallDeltas);
- }};
- }
-
public static void createCountersForAcidMetrics(TezCounters tezCounters,
JobConf jobConf) {
- if (HiveConf.getBoolVar(jobConf,
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
- MetastoreConf.getBoolVar(jobConf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
+ try {
+ if (HiveConf.getBoolVar(jobConf,
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf
+ .getBoolVar(jobConf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
- Arrays.stream(DeltaFilesMetricType.values())
- .filter(type -> jobConf.get(type.name()) != null)
- .forEach(type ->
-
Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())).forEach(
- (path, cnt) -> tezCounters.findCounter(type.value,
path).setValue(Long.parseLong(cnt))
- )
- );
+ Arrays.stream(DeltaFilesMetricType.values()).filter(type ->
jobConf.get(type.name()) != null).forEach(
+ type ->
Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name()))
+ .forEach((path, cnt) -> tezCounters.findCounter(type.value,
path).setValue(Long.parseLong(cnt))));
+ }
+ } catch (Exception e) {
Review comment:
findCounter could throw a LimitExceededException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]