zratkai commented on code in PR #5557:
URL: https://github.com/apache/hive/pull/5557#discussion_r1856324242


##########
llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java:
##########
@@ -161,98 +149,44 @@ private void setupMDCFromNDC(final Callable<V> 
actualCallable) {
       }
     }
 
-    /**
-     * LLAP IO related counters.
-     */
-    public enum LlapExecutorCounters {
-      EXECUTOR_CPU_NS,
-      EXECUTOR_USER_NS;
-
-    }
-
-    private void updateCounters(final List<LlapUtil.StatisticsData> 
statsBefore,
-        final Callable<V> actualCallable, long cpuTime, long userTime) {
+    private void updateCounters(final LlapThreadLocalStatistics statsBefore,
+        final Callable<V> actualCallable) {
       Thread thread = Thread.currentThread();
-      TezCounters tezCounters = null;
-      // add tez counters for task execution and llap io
-      if (actualCallable instanceof TaskRunner2Callable) {
-        TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) 
actualCallable;
-        // counters for task execution side
-        tezCounters = 
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
-      } else if (actualCallable instanceof TezCounterSource) {
-        // Other counter sources (currently used in LLAP IO).
-        tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
-      } else {
-        LOG.warn("Unexpected callable {}; cannot get counters", 
actualCallable);
-      }
+      final TezCounters tezCounters = getTezCounters(actualCallable);
 
       if (tezCounters != null) {
-        if (cpuTime >= 0 && userTime >= 0) {
-          
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
-          
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
-        }
         if (statsBefore != null) {
-          // if there are multiple stats for the same scheme (from different 
NameNode), this
-          // method will squash them together
-          Map<String, FileSystem.Statistics> schemeToStats = LlapUtil
-              .getCombinedFileSystemStatistics();
-          for (Map.Entry<String, FileSystem.Statistics> entry : 
schemeToStats.entrySet()) {
-            final String scheme = entry.getKey();
-            FileSystem.Statistics statistics = entry.getValue();
-            FileSystem.Statistics.StatisticsData threadFSStats = statistics
-                .getThreadStatistics();
-            List<LlapUtil.StatisticsData> allStatsBefore = LlapUtil
-                .getStatisticsForScheme(scheme, statsBefore);
-            long bytesReadDelta = 0;
-            long bytesWrittenDelta = 0;
-            long readOpsDelta = 0;
-            long largeReadOpsDelta = 0;
-            long writeOpsDelta = 0;
-            // there could be more scheme after execution as execution might 
be accessing a
-            // different filesystem. So if we don't find a matching scheme 
before execution we
-            // just use the after execution values directly without computing 
delta difference
-            if (allStatsBefore != null && !allStatsBefore.isEmpty()) {
-              for (LlapUtil.StatisticsData sb : allStatsBefore) {
-                bytesReadDelta += threadFSStats.getBytesRead() - 
sb.getBytesRead();
-                bytesWrittenDelta += threadFSStats.getBytesWritten() - 
sb.getBytesWritten();
-                readOpsDelta += threadFSStats.getReadOps() - sb.getReadOps();
-                largeReadOpsDelta += threadFSStats.getLargeReadOps() - 
sb.getLargeReadOps();
-                writeOpsDelta += threadFSStats.getWriteOps() - 
sb.getWriteOps();
-              }
-            } else {
-              bytesReadDelta = threadFSStats.getBytesRead();
-              bytesWrittenDelta = threadFSStats.getBytesWritten();
-              readOpsDelta = threadFSStats.getReadOps();
-              largeReadOpsDelta = threadFSStats.getLargeReadOps();
-              writeOpsDelta = threadFSStats.getWriteOps();
-            }
-            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ)
-                .increment(bytesReadDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN)
-                .increment(bytesWrittenDelta);
-            tezCounters.findCounter(scheme, 
FileSystemCounter.READ_OPS).increment(readOpsDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS)
-                .increment(largeReadOpsDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS)
-                .increment(writeOpsDelta);
+          LlapThreadLocalStatistics currentStats = new 
LlapThreadLocalStatistics(mxBean);
+          currentStats.subtract(statsBefore).fill(tezCounters);
 
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Updated stats: instance: {} thread name: {} thread 
id: {} scheme: {} " +
-                      "bytesRead: {} bytesWritten: {} readOps: {} 
largeReadOps: {} writeOps: {}",
-                  actualCallable.getClass().getSimpleName(), thread.getName(), 
thread.getId(),
-                  scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta, 
largeReadOpsDelta,
-                  writeOpsDelta);
-            }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Updated stats: instance: {} thread name: {} thread id: 
{} stats: {}",
+                actualCallable.getClass().getSimpleName(), thread.getName(), 
thread.getId(), currentStats);
           }
         } else {
           LOG.warn("File system statistics snapshot before execution of thread 
is null." +
-                  "Thread name: {} id: {} allStats: {}", thread.getName(), 
thread.getId(),
-              statsBefore);
+                  "Thread name: {} id: {}", thread.getName(), thread.getId());
         }
       } else {
         LOG.warn("TezCounters is null for callable type: {}",
             actualCallable.getClass().getSimpleName());
       }
     }
   }
+
+  private static <V> TezCounters getTezCounters(Callable<V> actualCallable) {
+    TezCounters tezCounters = null;
+    // add tez counters for task execution and llap io
+    if (actualCallable instanceof TaskRunner2Callable) {
+      TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) 
actualCallable;
+      // counters for task execution side
+      tezCounters = 
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
+    } else if (actualCallable instanceof TezCounterSource) {
+      // Other counter sources (currently used in LLAP IO).
+      tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
+    } else {
+      LOG.warn("Unexpected callable {}; cannot get counters", actualCallable);

Review Comment:
   Ok.



##########
llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java:
##########
@@ -161,98 +149,44 @@ private void setupMDCFromNDC(final Callable<V> 
actualCallable) {
       }
     }
 
-    /**
-     * LLAP IO related counters.
-     */
-    public enum LlapExecutorCounters {
-      EXECUTOR_CPU_NS,
-      EXECUTOR_USER_NS;
-
-    }
-
-    private void updateCounters(final List<LlapUtil.StatisticsData> 
statsBefore,
-        final Callable<V> actualCallable, long cpuTime, long userTime) {
+    private void updateCounters(final LlapThreadLocalStatistics statsBefore,
+        final Callable<V> actualCallable) {
       Thread thread = Thread.currentThread();
-      TezCounters tezCounters = null;
-      // add tez counters for task execution and llap io
-      if (actualCallable instanceof TaskRunner2Callable) {
-        TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) 
actualCallable;
-        // counters for task execution side
-        tezCounters = 
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
-      } else if (actualCallable instanceof TezCounterSource) {
-        // Other counter sources (currently used in LLAP IO).
-        tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
-      } else {
-        LOG.warn("Unexpected callable {}; cannot get counters", 
actualCallable);
-      }
+      final TezCounters tezCounters = getTezCounters(actualCallable);
 
       if (tezCounters != null) {
-        if (cpuTime >= 0 && userTime >= 0) {
-          
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
-          
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
-        }
         if (statsBefore != null) {
-          // if there are multiple stats for the same scheme (from different 
NameNode), this
-          // method will squash them together
-          Map<String, FileSystem.Statistics> schemeToStats = LlapUtil
-              .getCombinedFileSystemStatistics();
-          for (Map.Entry<String, FileSystem.Statistics> entry : 
schemeToStats.entrySet()) {
-            final String scheme = entry.getKey();
-            FileSystem.Statistics statistics = entry.getValue();
-            FileSystem.Statistics.StatisticsData threadFSStats = statistics
-                .getThreadStatistics();
-            List<LlapUtil.StatisticsData> allStatsBefore = LlapUtil
-                .getStatisticsForScheme(scheme, statsBefore);
-            long bytesReadDelta = 0;
-            long bytesWrittenDelta = 0;
-            long readOpsDelta = 0;
-            long largeReadOpsDelta = 0;
-            long writeOpsDelta = 0;
-            // there could be more scheme after execution as execution might 
be accessing a
-            // different filesystem. So if we don't find a matching scheme 
before execution we
-            // just use the after execution values directly without computing 
delta difference
-            if (allStatsBefore != null && !allStatsBefore.isEmpty()) {
-              for (LlapUtil.StatisticsData sb : allStatsBefore) {
-                bytesReadDelta += threadFSStats.getBytesRead() - 
sb.getBytesRead();
-                bytesWrittenDelta += threadFSStats.getBytesWritten() - 
sb.getBytesWritten();
-                readOpsDelta += threadFSStats.getReadOps() - sb.getReadOps();
-                largeReadOpsDelta += threadFSStats.getLargeReadOps() - 
sb.getLargeReadOps();
-                writeOpsDelta += threadFSStats.getWriteOps() - 
sb.getWriteOps();
-              }
-            } else {
-              bytesReadDelta = threadFSStats.getBytesRead();
-              bytesWrittenDelta = threadFSStats.getBytesWritten();
-              readOpsDelta = threadFSStats.getReadOps();
-              largeReadOpsDelta = threadFSStats.getLargeReadOps();
-              writeOpsDelta = threadFSStats.getWriteOps();
-            }
-            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ)
-                .increment(bytesReadDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN)
-                .increment(bytesWrittenDelta);
-            tezCounters.findCounter(scheme, 
FileSystemCounter.READ_OPS).increment(readOpsDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS)
-                .increment(largeReadOpsDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS)
-                .increment(writeOpsDelta);
+          LlapThreadLocalStatistics currentStats = new 
LlapThreadLocalStatistics(mxBean);
+          currentStats.subtract(statsBefore).fill(tezCounters);
 
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Updated stats: instance: {} thread name: {} thread 
id: {} scheme: {} " +
-                      "bytesRead: {} bytesWritten: {} readOps: {} 
largeReadOps: {} writeOps: {}",
-                  actualCallable.getClass().getSimpleName(), thread.getName(), 
thread.getId(),
-                  scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta, 
largeReadOpsDelta,
-                  writeOpsDelta);
-            }
+          if (LOG.isDebugEnabled()) {

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to