odraese commented on a change in pull request #944: [Hive-22760] Adding Clock 
based eviction strategy.
URL: https://github.com/apache/hive/pull/944#discussion_r395928192
 
 

 ##########
 File path: 
llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
 ##########
 @@ -91,143 +74,148 @@
   public static final Logger CACHE_LOGGER = 
LoggerFactory.getLogger("LlapIoCache");
   public static final Logger LOCKING_LOGGER = 
LoggerFactory.getLogger("LlapIoLocking");
   private static final String MODE_CACHE = "cache";
+  private static final String DISPLAY_NAME = "LlapDaemonCacheMetrics-" + 
MetricsUtils.getHostName();
+  private static final String IO_DISPLAY_NAME = "LlapDaemonIOMetrics-" + 
MetricsUtils.getHostName();
 
   // TODO: later, we may have a map
   private final ColumnVectorProducer orcCvp, genericCvp;
   private final ExecutorService executor;
   private final LlapDaemonCacheMetrics cacheMetrics;
   private final LlapDaemonIOMetrics ioMetrics;
-  private ObjectName buddyAllocatorMXBean;
+  private final ObjectName buddyAllocatorMXBean;
   private final Allocator allocator;
   private final FileMetadataCache fileMetadataCache;
   private final LowLevelCache dataCache;
   private final BufferUsageManager bufferManager;
   private final Configuration daemonConf;
   private final LowLevelCacheMemoryManager memoryManager;
-
-  private List<LlapIoDebugDump> debugDumpComponents = new ArrayList<>();
-
-  private LlapIoImpl(Configuration conf) throws IOException {
-    this.daemonConf = conf;
-    String ioMode = HiveConf.getVar(conf, 
HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
-    boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode);
-    LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? 
LlapIoImpl.MODE_CACHE : "none");
-    String displayName = "LlapDaemonCacheMetrics-" + 
MetricsUtils.getHostName();
-    String sessionId = conf.get("llap.daemon.metrics.sessionid");
-    this.cacheMetrics = LlapDaemonCacheMetrics.create(displayName, sessionId);
-
-    displayName = "LlapDaemonIOMetrics-" + MetricsUtils.getHostName();
-    String[] strIntervals = HiveConf.getTrimmedStringsVar(conf,
-        HiveConf.ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS);
-    List<Integer> intervalList = new ArrayList<>();
-    if (strIntervals != null) {
-      for (String strInterval : strIntervals) {
-        try {
-          intervalList.add(Integer.valueOf(strInterval));
-        } catch (NumberFormatException e) {
-          LOG.warn("Ignoring IO decoding metrics interval {} from {} as it is 
invalid", strInterval,
-              Arrays.toString(strIntervals));
-        }
-      }
+  private final List<LlapIoDebugDump> debugDumpComponents = new ArrayList<>();
+
+  /**
+   * Llap IO is created via Reflection by {@link 
org.apache.hadoop.hive.llap.io.api.LlapProxy}.
+   *
+   * @param conf Configuration containing all the needed parameters and flags 
to initialize LLAP IO.
+   */
+  private LlapIoImpl(Configuration conf) {
+    this.daemonConf = Preconditions.checkNotNull(conf);
+    final boolean
+        useLowLevelCache =
+        LlapIoImpl.MODE_CACHE.equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.LLAP_IO_MEMORY_MODE));
+    final boolean isEncodeEnabled = useLowLevelCache && 
HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED);
+    LOG.info("Initializing LLAP IO in {} mode, with Encoding = {} ",
+        useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none",
+        isEncodeEnabled);
+
+    //setup metrics reporters
+    final String sessionId = conf.get("llap.daemon.metrics.sessionid");
+    final String[]
+        strIntervals =
+        HiveConf.getTrimmedStringsVar(conf, 
HiveConf.ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS);
+    int[] intArray = stringsToIntegers(strIntervals);
+    if (strIntervals != null && strIntervals.length != intArray.length) {
+      LOG.warn("Ignoring IO decoding metrics interval from {} as it is invalid 
due to Number format exception",
+          Arrays.toString(strIntervals));
     }
-    this.ioMetrics = LlapDaemonIOMetrics.create(displayName, sessionId, 
Ints.toArray(intervalList));
+    this.ioMetrics = LlapDaemonIOMetrics.create(IO_DISPLAY_NAME, sessionId, 
intArray);
+    this.cacheMetrics = LlapDaemonCacheMetrics.create(DISPLAY_NAME, sessionId);
+    LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", 
IO_DISPLAY_NAME, sessionId);
 
-    LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", 
displayName,
-        sessionId);
+    int numThreads = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
+    this.executor =
+        new StatsRecordingThreadPool(numThreads,
+            numThreads,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new 
ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
+    LOG.info("Created IO Elevator Thread pool with {} Threads ", numThreads);
+    // IO thread pool. Listening is used for unhandled errors for now (TODO: 
remove?)
+    FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf, 
numThreads);
 
-    MetadataCache metadataCache = null;
-    SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when 
needed
-    BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null;
-    boolean isEncodeEnabled = useLowLevelCache
-        && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED);
+    final MetadataCache metadataCache;
+    final BufferUsageManager bufferManagerOrc, bufferManagerGeneric;
+    final SerDeLowLevelCacheImpl serdeCache; // TODO: extract interface when 
needed
     if (useLowLevelCache) {
       // Memory manager uses cache policy to trigger evictions, so create the 
policy first.
-      boolean useLrfu = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_USE_LRFU);
-      long totalMemorySize = HiveConf.getSizeVar(conf, 
ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
-      int minAllocSize = (int) HiveConf.getSizeVar(conf, 
ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
-      LowLevelCachePolicy
-          realCachePolicy =
-          useLrfu ? new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, 
conf) : new LowLevelFifoCachePolicy();
-      boolean trackUsage = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
-      LowLevelCachePolicy cachePolicyWrapper;
-      if (trackUsage) {
-        cachePolicyWrapper = new CacheContentsTracker(realCachePolicy);
-      } else {
-        cachePolicyWrapper = realCachePolicy;
-      }
+      final long totalMemorySize = HiveConf.getSizeVar(conf, 
ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+
+      final LowLevelCachePolicy cachePolicyWrapper = 
LowLevelCachePolicy.provideFromConf(conf);
+
       // Allocator uses memory manager to request memory, so create the 
manager next.
-      this.memoryManager = new LowLevelCacheMemoryManager(
-          totalMemorySize, cachePolicyWrapper, cacheMetrics);
-      cacheMetrics.setCacheCapacityTotal(totalMemorySize);
+      this.memoryManager = new LowLevelCacheMemoryManager(totalMemorySize, 
cachePolicyWrapper, cacheMetrics);
+      this.cacheMetrics.setCacheCapacityTotal(totalMemorySize);
       // Cache uses allocator to allocate and deallocate, create allocator and 
then caches.
       BuddyAllocator allocator = new BuddyAllocator(conf, memoryManager, 
cacheMetrics);
       this.allocator = allocator;
-      LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
-          cacheMetrics, cachePolicyWrapper, allocator, true);
-      dataCache = cacheImpl;
+      LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(cacheMetrics, 
cachePolicyWrapper, allocator, true);
+      this.dataCache = cacheImpl;
       if (isEncodeEnabled) {
-        SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
-            cacheMetrics, cachePolicyWrapper, allocator);
-        serdeCache = serdeCacheImpl;
-        serdeCacheImpl.setConf(conf);
+        serdeCache = new SerDeLowLevelCacheImpl(cacheMetrics, 
cachePolicyWrapper, allocator);
+        serdeCache.setConf(conf);
+      } else {
+        serdeCache = null;
       }
-
-      boolean useGapCache = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
-      metadataCache = new MetadataCache(
-          allocator, memoryManager, cachePolicyWrapper, useGapCache, 
cacheMetrics);
-      fileMetadataCache = metadataCache;
+      final boolean useGapCache = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
+      metadataCache = new MetadataCache(allocator, memoryManager, 
cachePolicyWrapper, useGapCache, cacheMetrics);
+      this.fileMetadataCache = metadataCache;
       // And finally cache policy uses cache to notify it of eviction. The 
cycle is complete!
-      EvictionDispatcher e = new EvictionDispatcher(
-          dataCache, serdeCache, metadataCache, allocator);
+      final EvictionDispatcher e = new EvictionDispatcher(dataCache, 
serdeCache, metadataCache, allocator);
       cachePolicyWrapper.setEvictionListener(e);
-
       cacheImpl.startThreads(); // Start the cache threads.
       bufferManager = bufferManagerOrc = cacheImpl; // Cache also serves as 
buffer manager.
       bufferManagerGeneric = serdeCache;
-      if (trackUsage) {
-        debugDumpComponents.add(cachePolicyWrapper); // Cache contents tracker.
-      }
-      debugDumpComponents.add(realCachePolicy);
+      debugDumpComponents.add(cachePolicyWrapper); // Cache contents tracker.
       debugDumpComponents.add(cacheImpl);
       if (serdeCache != null) {
         debugDumpComponents.add(serdeCache);
       }
-      if (metadataCache != null) {
-        debugDumpComponents.add(metadataCache);
-      }
+      debugDumpComponents.add(metadataCache);
       debugDumpComponents.add(allocator);
     } else {
       this.allocator = new SimpleAllocator(conf);
-      fileMetadataCache = null;
-      SimpleBufferManager sbm = new SimpleBufferManager(allocator, 
cacheMetrics);
-      bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
-      dataCache = sbm;
+      final SimpleBufferManager sbm = new SimpleBufferManager(allocator, 
cacheMetrics);
+      this.bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
+      this.dataCache = sbm;
+      this.debugDumpComponents.add(sb -> sb.append("LLAP IO allocator is not 
in use!"));
+      this.fileMetadataCache = null;
       this.memoryManager = null;
-      debugDumpComponents.add(new LlapIoDebugDump() {
-        @Override
-        public void debugDumpShort(StringBuilder sb) {
-          sb.append("LLAP IO allocator is not in use!");
-        }
-      });
+      serdeCache = null;
+      metadataCache = null;
     }
-    // IO thread pool. Listening is used for unhandled errors for now (TODO: 
remove?)
-    int numThreads = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
-    executor = new StatsRecordingThreadPool(numThreads, numThreads, 0L, 
TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>(),
-        new 
ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
-    FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf);
-    // TODO: this should depends on input format and be in a map, or something.
-    this.orcCvp = new OrcColumnVectorProducer(
-        metadataCache, dataCache, bufferManagerOrc, conf, cacheMetrics, 
ioMetrics, tracePool);
-    this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer(
-        serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, 
tracePool) : null;
-    LOG.info("LLAP IO initialized");
 
-    registerMXBeans();
+    // TODO: this should depends on input format and be in a map, or something.
+    this.orcCvp =
+        new OrcColumnVectorProducer(metadataCache,
+            dataCache,
+            bufferManagerOrc,
+            conf,
+            cacheMetrics,
+            ioMetrics,
+            tracePool);
+    this.genericCvp =
+        isEncodeEnabled ?
+            new GenericColumnVectorProducer(serdeCache,
+                bufferManagerGeneric,
+                conf,
+                cacheMetrics,
+                ioMetrics,
+                tracePool) :
+            null;
+    this.buddyAllocatorMXBean = MBeans.register("LlapDaemon", 
"BuddyAllocatorInfo", allocator);
+    LOG.info("LLAP IO successful initialized.");
   }
 
-  private void registerMXBeans() {
-    buddyAllocatorMXBean = MBeans.register("LlapDaemon", "BuddyAllocatorInfo", 
allocator);
+  private static int[] stringsToIntegers(String[] strings) {
+    if (strings == null) {
+      return new int[0];
+    }
+    return Arrays.stream(strings).map(x -> {
 
 Review comment:
   not that I don't appreciate a nice stream function but I suspect this is the 
least efficient method to covnert integer strings to int. each instance is 
wrapped into an optional just fo filter out the non-parsable directly again?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to