This is an automated email from the ASF dual-hosted git repository.

karanmehta93 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 982b09a  PHOENIX-5069 Use asynchronous refresh to provide non-blocking 
Phoenix Stats Client Cache
982b09a is described below

commit 982b09adb43e8f837b5e6a2cd23921ec08e33065
Author: Bin <b...@salesforce.com>
AuthorDate: Sun Oct 28 12:21:27 2018 -0700

    PHOENIX-5069 Use asynchronous refresh to provide non-blocking Phoenix Stats 
Client Cache
    
    Signed-off-by: Bin <b...@salesforce.com>
---
 .../org/apache/phoenix/query/GuidePostsCache.java  |  86 ++++++++----
 .../phoenix/query/PhoenixStatsCacheLoader.java     |  90 ++++++++++++
 .../apache/phoenix/query/PhoenixStatsLoader.java   |  57 ++++++++
 .../org/apache/phoenix/query/QueryServices.java    |   4 +-
 .../apache/phoenix/query/QueryServicesOptions.java |   7 +
 .../phoenix/schema/stats/GuidePostsInfo.java       |   6 +-
 .../phoenix/query/PhoenixStatsCacheLoaderTest.java | 156 +++++++++++++++++++++
 7 files changed, 377 insertions(+), 29 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java
index 26e40f6..b24a1e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/GuidePostsCache.java
@@ -22,11 +22,11 @@ import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_STATS_COLLEC
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalListener;
@@ -61,18 +60,25 @@ public class GuidePostsCache {
 
     public GuidePostsCache(ConnectionQueryServices queryServices, 
Configuration config) {
         this.queryServices = Objects.requireNonNull(queryServices);
+
         // Number of millis to expire cache values after write
         final long statsUpdateFrequency = config.getLong(
                 QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
                 QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
-        // Maximum number of entries (tables) to store in the cache at one time
+
+        // Maximum total weight (size in bytes) of stats entries
         final long maxTableStatsCacheSize = config.getLong(
                 QueryServices.STATS_MAX_CACHE_SIZE,
                 QueryServicesOptions.DEFAULT_STATS_MAX_CACHE_SIZE);
+
                final boolean isStatsEnabled = 
config.getBoolean(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED);
+
+        PhoenixStatsCacheLoader cacheLoader = new PhoenixStatsCacheLoader(
+                isStatsEnabled ? new StatsLoaderImpl() : new 
EmptyStatsLoader(), config);
+
         cache = CacheBuilder.newBuilder()
-                // Expire entries a given amount of time after they were 
written
-                .expireAfterWrite(statsUpdateFrequency, TimeUnit.MILLISECONDS)
+                // Refresh entries a given amount of time after they were 
written
+                .refreshAfterWrite(statsUpdateFrequency, TimeUnit.MILLISECONDS)
                 // Maximum total weight (size in bytes) of stats entries
                 .maximumWeight(maxTableStatsCacheSize)
                 // Defer actual size to the PTableStats.getEstimatedSize()
@@ -83,19 +89,38 @@ public class GuidePostsCache {
                 })
                 // Log removals at TRACE for debugging
                 .removalListener(new PhoenixStatsCacheRemovalListener())
-                // Automatically load the cache when entries are missing
-                .build(isStatsEnabled ? new StatsLoader() : new 
EmptyStatsLoader());
+                // Automatically load the cache when entries need to be 
refreshed
+                .build(cacheLoader);
     }
 
     /**
-     * {@link CacheLoader} implementation for the Phoenix Table Stats cache.
+     * {@link PhoenixStatsLoader} implementation for the Stats Loader.
      */
-    protected class StatsLoader extends CacheLoader<GuidePostsKey, 
GuidePostsInfo> {
+    protected class StatsLoaderImpl implements PhoenixStatsLoader {
+        @Override
+        public boolean needsLoad() {
+            // For now, whenever it's called, we try to load stats from stats 
table
+            // no matter it has been updated or not.
+            // Here are the possible optimizations we can do here:
+            // 1. Load stats from the stats table only when the stats get 
updated on the server side.
+            // 2. Support different refresh cycle for different tables.
+            return true;
+        }
+
+        @Override
+        public GuidePostsInfo loadStats(GuidePostsKey statsKey) throws 
Exception {
+            return loadStats(statsKey, GuidePostsInfo.NO_GUIDEPOST);
+        }
+
         @Override
-        public GuidePostsInfo load(GuidePostsKey statsKey) throws Exception {
-            Table statsHTable = 
queryServices.getTable(SchemaUtil.getPhysicalName(
+        public GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo 
prevGuidepostInfo) throws Exception {
+            assert(prevGuidepostInfo != null);
+
+            TableName tableName = SchemaUtil.getPhysicalName(
                     PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
-                            queryServices.getProps()).getName());
+                    queryServices.getProps());
+            Table statsHTable = queryServices.getTable(tableName.getName());
+
             try {
                 GuidePostsInfo guidePostsInfo = 
StatisticsUtil.readStatistics(statsHTable, statsKey,
                         HConstants.LATEST_TIMESTAMP);
@@ -103,18 +128,17 @@ public class GuidePostsCache {
                 return guidePostsInfo;
             } catch (TableNotFoundException e) {
                 // On a fresh install, stats might not yet be created, don't 
warn about this.
-                logger.debug("Unable to locate Phoenix stats table", e);
-                return GuidePostsInfo.NO_GUIDEPOST;
+                logger.debug("Unable to locate Phoenix stats table: " + 
tableName.toString(), e);
+                return prevGuidepostInfo;
             } catch (IOException e) {
-                logger.warn("Unable to read from stats table", e);
-                // Just cache empty stats. We'll try again after some time 
anyway.
-                return GuidePostsInfo.NO_GUIDEPOST;
+                logger.warn("Unable to read from stats table: " + 
tableName.toString(), e);
+                return prevGuidepostInfo;
             } finally {
                 try {
                     statsHTable.close();
                 } catch (IOException e) {
                     // Log, but continue. We have our stats anyway now.
-                    logger.warn("Unable to close stats table", e);
+                    logger.warn("Unable to close stats table: " + 
tableName.toString(), e);
                 }
             }
         }
@@ -125,20 +149,30 @@ public class GuidePostsCache {
         void traceStatsUpdate(GuidePostsKey key, GuidePostsInfo info) {
             if (logger.isTraceEnabled()) {
                 logger.trace("Updating local TableStats cache (id={}) for {}, 
size={}bytes",
-                      new Object[] {Objects.hashCode(GuidePostsCache.this), 
key,
-                      info.getEstimatedSize()});
+                        new Object[] {Objects.hashCode(GuidePostsCache.this), 
key, info.getEstimatedSize()});
             }
         }
     }
 
     /**
+     * {@link PhoenixStatsLoader} implementation for the Stats Loader.
      * Empty stats loader if stats are disabled
      */
-       protected class EmptyStatsLoader extends CacheLoader<GuidePostsKey, 
GuidePostsInfo> {
-               @Override
-               public GuidePostsInfo load(GuidePostsKey statsKey) throws 
Exception {
-                       return GuidePostsInfo.NO_GUIDEPOST;
-               }
+       protected class EmptyStatsLoader implements PhoenixStatsLoader {
+        @Override
+        public boolean needsLoad() {
+            return false;
+        }
+
+        @Override
+        public GuidePostsInfo loadStats(GuidePostsKey statsKey) throws 
Exception {
+            return GuidePostsInfo.NO_GUIDEPOST;
+        }
+
+        @Override
+        public GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo 
prevGuidepostInfo) throws Exception {
+            return GuidePostsInfo.NO_GUIDEPOST;
+        }
        }
 
     /**
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsCacheLoader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsCacheLoader.java
new file mode 100644
index 0000000..8caa934
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsCacheLoader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * {@link CacheLoader} implementation for the Phoenix Table Stats cache.
+ */
+public class PhoenixStatsCacheLoader extends CacheLoader<GuidePostsKey, 
GuidePostsInfo> {
+    private static final Logger logger = 
LoggerFactory.getLogger(PhoenixStatsCacheLoader.class);
+
+    final private PhoenixStatsLoader statsLoader;
+    private static volatile ExecutorService executor;
+
+    public PhoenixStatsCacheLoader(PhoenixStatsLoader statsLoader, 
Configuration config) {
+        this.statsLoader = statsLoader;
+
+        if (this.executor == null) {
+            synchronized (PhoenixStatsCacheLoader.class) {
+                if (this.executor == null) {
+                    // The size of the thread pool used for refreshing cached 
table stats
+                    final int statsCacheThreadPoolSize = config.getInt(
+                            QueryServices.STATS_CACHE_THREAD_POOL_SIZE,
+                            
QueryServicesOptions.DEFAULT_STATS_CACHE_THREAD_POOL_SIZE);
+
+                    this.executor = 
Executors.newFixedThreadPool(statsCacheThreadPoolSize);
+                }
+            }
+        }
+    }
+
+    @Override
+    public GuidePostsInfo load(GuidePostsKey statsKey) throws Exception {
+        return statsLoader.loadStats(statsKey);
+    }
+
+    @Override
+    public ListenableFuture<GuidePostsInfo> reload(
+            final GuidePostsKey key,
+            GuidePostsInfo prevGuidepostInfo)
+    {
+        if (statsLoader.needsLoad()) {
+            // schedule asynchronous task
+            ListenableFutureTask<GuidePostsInfo> task =
+                    ListenableFutureTask.create(new Callable<GuidePostsInfo>() 
{
+                        public GuidePostsInfo call() {
+                            try {
+                                return statsLoader.loadStats(key, 
prevGuidepostInfo);
+                            } catch (Exception e) {
+                                logger.warn("Unable to load stats from table: 
" + key.toString(), e);
+                                return prevGuidepostInfo;
+                            }
+                        }
+                    });
+            executor.execute(task);
+            return task;
+        }
+        else {
+            return Futures.immediateFuture(prevGuidepostInfo);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsLoader.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsLoader.java
new file mode 100644
index 0000000..eda5e56
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/PhoenixStatsLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.GuidePostsKey;
+
+/**
+ * The interface for talking to underneath layers to load stats from stats 
table for a given key
+ */
+public interface PhoenixStatsLoader {
+    /**
+     * Use to check whether this is the time to load stats from stats table.
+     * There are two cases:
+     * a. After a specified duration has passed
+     * b. The stats on server side (e.g. in stats table) has been updated
+     *
+     * @return boolean indicates whether we need to load stats or not
+     */
+    boolean needsLoad();
+
+    /**
+     * Called by client stats cache to load stats from underneath layers
+     *
+     * @param statsKey the stats key used to search the stats on server side 
(in stats table)
+     * @throws Exception
+     *
+     * @return GuidePostsInfo retrieved from sever side
+     */
+    GuidePostsInfo loadStats(GuidePostsKey statsKey) throws Exception;
+
+    /**
+     * Called by client stats cache to load stats from underneath layers
+     *
+     * @param statsKey the stats key used to search the stats on server side 
(in stats table)
+     * @param prevGuidepostInfo the existing stats cached on the client side 
or GuidePostsInfo.NO_GUIDEPOST
+     * @throws Exception
+     *
+     * @return GuidePostsInfo retrieved from sever side
+     */
+    GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo 
prevGuidepostInfo) throws Exception;
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e279f05..fc11539 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -197,8 +197,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String COMMIT_STATS_ASYNC = 
"phoenix.stats.commit.async";
     // Maximum size in bytes taken up by cached table stats in the client
     public static final String STATS_MAX_CACHE_SIZE = 
"phoenix.stats.cache.maxSize";
-    public static final String LOG_SALT_BUCKETS_ATTRIB = 
"phoenix.log.saltBuckets";
+    // The size of the thread pool used for refreshing cached table stats in 
stats client cache
+    public static final String STATS_CACHE_THREAD_POOL_SIZE = 
"phoenix.stats.cache.threadPoolSize";
 
+    public static final String LOG_SALT_BUCKETS_ATTRIB = 
"phoenix.log.saltBuckets";
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = 
"phoenix.sequence.saltBuckets";
     public static final String COPROCESSOR_PRIORITY_ATTRIB = 
"phoenix.coprocessor.priority";
     public static final String EXPLAIN_CHUNK_COUNT_ATTRIB = 
"phoenix.explain.displayChunkCount";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b8cfe1f..e71e531 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -86,6 +86,7 @@ import static 
org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRI
 import static org.apache.phoenix.query.QueryServices.STATS_COLLECTION_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.STATS_CACHE_THREAD_POOL_SIZE;
 import static 
org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
@@ -246,6 +247,7 @@ public class QueryServicesOptions {
     public static final long DEFAULT_STATS_MAX_CACHE_SIZE = 256 * 1024 * 1024;
     // Allow stats collection to be initiated by client multiple times 
immediately
     public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0;
+    public static final int DEFAULT_STATS_CACHE_THREAD_POOL_SIZE = 4;
 
     public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
 
@@ -413,6 +415,7 @@ public class QueryServicesOptions {
             .setIfUnset(DATE_FORMAT_TIMEZONE_ATTRIB, 
DEFAULT_DATE_FORMAT_TIMEZONE)
             .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, 
DEFAULT_STATS_UPDATE_FREQ_MS)
             .setIfUnset(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, 
DEFAULT_MIN_STATS_UPDATE_FREQ_MS)
+            .setIfUnset(STATS_CACHE_THREAD_POOL_SIZE, 
DEFAULT_STATS_CACHE_THREAD_POOL_SIZE)
             .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, 
DEFAULT_CALL_QUEUE_ROUND_ROBIN)
             .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
             .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, 
DEFAULT_FORCE_ROW_KEY_ORDER)
@@ -736,6 +739,10 @@ public class QueryServicesOptions {
         return set(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs);
     }
 
+    public QueryServicesOptions setStatsCacheThreadPoolSize(int 
threadPoolSize) {
+        return set(STATS_CACHE_THREAD_POOL_SIZE, threadPoolSize);
+    }
+
     public QueryServicesOptions setSequenceSaltBuckets(int saltBuckets) {
         config.setInt(SEQUENCE_SALT_BUCKETS_ATTRIB, saltBuckets);
         return this;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
index 04c69bf..9492a35 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java
@@ -95,15 +95,17 @@ public class GuidePostsInfo {
         this.guidePostsCount = guidePostsCount;
         this.rowCounts = Longs.toArray(rowCounts);
         this.byteCounts = Longs.toArray(byteCounts);
-        int estimatedSize = SizedUtil.OBJECT_SIZE 
+        this.gpTimestamps = Longs.toArray(updateTimes);
+        // Those Java equivalents of sizeof() in C/C++, mentioned on the Web, 
might be overkilled here.
+        int estimatedSize = SizedUtil.OBJECT_SIZE
                 + SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE + 
guidePosts.getLength() // guidePosts
                 + SizedUtil.INT_SIZE // maxLength
                 + SizedUtil.INT_SIZE // guidePostsCount
                 + SizedUtil.ARRAY_SIZE + this.rowCounts.length * 
SizedUtil.LONG_SIZE // rowCounts
                 + SizedUtil.ARRAY_SIZE + this.byteCounts.length * 
SizedUtil.LONG_SIZE // byteCounts
+                + SizedUtil.ARRAY_SIZE + this.gpTimestamps.length * 
SizedUtil.LONG_SIZE // gpTimestamps
                 + SizedUtil.INT_SIZE; // estimatedSize
         this.estimatedSize = estimatedSize;
-        this.gpTimestamps = Longs.toArray(updateTimes);
     }
     
     public ImmutableBytesWritable getGuidePosts() {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheLoaderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheLoaderTest.java
new file mode 100644
index 0000000..e9c6d40
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheLoaderTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.util.ByteUtil;
+import org.junit.Test;
+
+import java.lang.Thread;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Test class around the PhoenixStatsCacheLoader.
+ */
+public class PhoenixStatsCacheLoaderTest {
+    /**
+     * {@link PhoenixStatsLoader} test implementation for the Stats Loader.
+     */
+    protected class TestStatsLoaderImpl implements PhoenixStatsLoader {
+        private int maxLength = 1;
+        private final CountDownLatch firstTimeRefreshedSignal;
+        private final CountDownLatch secondTimeRefreshedSignal;
+
+        public TestStatsLoaderImpl(CountDownLatch firstTimeRefreshedSignal, 
CountDownLatch secondTimeRefreshedSignal) {
+            this.firstTimeRefreshedSignal = firstTimeRefreshedSignal;
+            this.secondTimeRefreshedSignal = secondTimeRefreshedSignal;
+        }
+
+        @Override
+        public boolean needsLoad() {
+            // Whenever it's called, we try to load stats from stats table
+            // no matter it has been updated or not.
+            return true;
+        }
+
+        @Override
+        public GuidePostsInfo loadStats(GuidePostsKey statsKey) throws 
Exception {
+            return new GuidePostsInfo(Collections.<Long> emptyList(),
+                    new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY),
+                    Collections.<Long> emptyList(), maxLength++, 0, 
Collections.<Long> emptyList());
+        }
+
+        @Override
+        public GuidePostsInfo loadStats(GuidePostsKey statsKey, GuidePostsInfo 
prevGuidepostInfo) throws Exception {
+            firstTimeRefreshedSignal.countDown();
+            secondTimeRefreshedSignal.countDown();
+
+            return new GuidePostsInfo(Collections.<Long> emptyList(),
+                    new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY),
+                    Collections.<Long> emptyList(), maxLength++, 0, 
Collections.<Long> emptyList());
+        }
+    }
+
+    GuidePostsInfo getStats(LoadingCache<GuidePostsKey, GuidePostsInfo> cache, 
GuidePostsKey guidePostsKey) {
+        GuidePostsInfo guidePostsInfo;
+        try {
+            guidePostsInfo = cache.get(guidePostsKey);
+        } catch (ExecutionException e) {
+            assertFalse(true);
+            return GuidePostsInfo.NO_GUIDEPOST;
+        }
+
+        return guidePostsInfo;
+    }
+
+    void sleep(int x) {
+        try {
+            Thread.sleep(x);
+        }
+        catch (InterruptedException e) {
+            assertFalse(true);
+        }
+    }
+
+    @Test
+    public void testStatsBeingAutomaticallyRefreshed() {
+        ExecutorService executor = Executors.newFixedThreadPool(4);
+
+        CountDownLatch firstTimeRefreshedSignal = new CountDownLatch(1);
+        CountDownLatch secondTimeRefreshedSignal = new CountDownLatch(2);
+
+        Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+
+        LoadingCache<GuidePostsKey, GuidePostsInfo> cache = 
CacheBuilder.newBuilder()
+                // Refresh entries a given amount of time after they were 
written
+                .refreshAfterWrite(100, TimeUnit.MILLISECONDS)
+                // Maximum total weight (size in bytes) of stats entries
+                
.maximumWeight(QueryServicesOptions.DEFAULT_STATS_MAX_CACHE_SIZE)
+                // Defer actual size to the PTableStats.getEstimatedSize()
+                .weigher(new Weigher<GuidePostsKey, GuidePostsInfo>() {
+                    @Override public int weigh(GuidePostsKey key, 
GuidePostsInfo info) {
+                        return info.getEstimatedSize();
+                    }
+                })
+                // Log removals at TRACE for debugging
+                .removalListener(new 
GuidePostsCache.PhoenixStatsCacheRemovalListener())
+                // Automatically load the cache when entries are missing
+                .build(new PhoenixStatsCacheLoader(new TestStatsLoaderImpl(
+                        firstTimeRefreshedSignal, secondTimeRefreshedSignal), 
config));
+
+        try {
+            GuidePostsKey guidePostsKey = new GuidePostsKey(new byte[4], new 
byte[4]);
+            GuidePostsInfo guidePostsInfo = getStats(cache, guidePostsKey);
+            assertTrue(guidePostsInfo.getMaxLength() == 1);
+
+            // Note: With Guava cache, automatic refreshes are performed when 
the first stale request for an entry occurs.
+
+            // After we sleep here for any time which is larger than the 
refresh cycle, the refresh of cache entry will be
+            // triggered for its first time by the call of getStats(). This is 
deterministic behavior, and it won't cause
+            // randomized test failures.
+            sleep(150);
+            guidePostsInfo = getStats(cache, guidePostsKey);
+            // Refresh has been triggered for its first time, but still could 
get the old value
+            assertTrue(guidePostsInfo.getMaxLength() >= 1);
+            firstTimeRefreshedSignal.await();
+
+            sleep(150);
+            guidePostsInfo = getStats(cache, guidePostsKey);
+            // Now the second time refresh has been triggered by the above 
getStats() call, the first time Refresh has completed
+            // and the cache entry has been updated for sure.
+            assertTrue(guidePostsInfo.getMaxLength() >= 2);
+            secondTimeRefreshedSignal.await();
+        }
+        catch (InterruptedException e) {
+            assertFalse(true);
+        }
+    }
+}
\ No newline at end of file

Reply via email to