NihalJain commented on code in PR #2024:
URL: https://github.com/apache/phoenix/pull/2024#discussion_r2047043258


##########
phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java:
##########
@@ -79,485 +78,497 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * 
  * Client for sending cache to each region server
- * 
- * 
  * @since 0.1
  */
 public class ServerCacheClient {
-    public static final int UUID_LENGTH = Bytes.SIZEOF_LONG;
-    public static final byte[] KEY_IN_FIRST_REGION = new byte[]{0};
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerCacheClient.class);
-    private static final Random RANDOM = new Random();
-       public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = 
"hash.join.server.cache.resend.per.server";
-    private final PhoenixConnection connection;
-    private final Map<Integer, PTable> cacheUsingTableMap = new 
ConcurrentHashMap<Integer, PTable>();
+  public static final int UUID_LENGTH = Bytes.SIZEOF_LONG;
+  public static final byte[] KEY_IN_FIRST_REGION = new byte[] { 0 };
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerCacheClient.class);
+  private static final Random RANDOM = new Random();
+  public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER =
+    "hash.join.server.cache.resend.per.server";
+  private final PhoenixConnection connection;
+  private final Map<Integer, PTable> cacheUsingTableMap = new 
ConcurrentHashMap<Integer, PTable>();
 
-    /**
-     * Construct client used to create a serialized cached snapshot of a table 
and send it to each region server
-     * for caching during hash join processing.
-     * @param connection the client connection
-     * 
-     * TODO: instead of minMaxKeyRange, have an interface for iterating 
through ranges as we may be sending to
-     * servers when we don't have to if the min is in first region and max is 
in last region, especially for point queries.
-     */
-    public ServerCacheClient(PhoenixConnection connection) {
-        this.connection = connection;
-    }
-
-    public PhoenixConnection getConnection() {
-        return connection;
-    }
-    
-    /**
-     * Client-side representation of a server cache.  Call {@link #close()} 
when usage
-     * is complete to free cache up on region server
-     *
-     * 
-     * @since 0.1
-     */
-    public class ServerCache implements SQLCloseable {
-        private final int size;
-        private final byte[] id;
-        private final Map<HRegionLocation, Long> servers;
-        private ImmutableBytesWritable cachePtr;
-        private MemoryChunk chunk;
-        private File outputFile;
-        private long maxServerCacheTTL;
-        
-        
-        public ServerCache(byte[] id, Set<HRegionLocation> servers, 
ImmutableBytesWritable cachePtr,
-                ConnectionQueryServices services, boolean storeCacheOnClient) 
throws IOException {
-            maxServerCacheTTL = services.getProps().getInt(
-                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
-                    
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
-            this.id = id;
-            this.servers = new HashMap();
-            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
-            for(HRegionLocation loc : servers) {
-                this.servers.put(loc, currentTime);
-            }
-            this.size =  cachePtr.getLength();
-            if (storeCacheOnClient) {
-                try {
-                    this.chunk = 
services.getMemoryManager().allocate(cachePtr.getLength());
-                    this.cachePtr = cachePtr;
-                } catch (InsufficientMemoryException e) {
-                    this.outputFile = 
File.createTempFile("HashJoinCacheSpooler", ".bin", new File(services.getProps()
-                            .get(QueryServices.SPOOL_DIRECTORY, 
QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)));
-                    try (OutputStream fio = 
Files.newOutputStream(outputFile.toPath())) {
-                        fio.write(cachePtr.get(), cachePtr.getOffset(), 
cachePtr.getLength());
-                    }
-                }
-            }
-            
-        }
+  /**
+   * Construct client used to create a serialized cached snapshot of a table 
and send it to each
+   * region server for caching during hash join processing.
+   * @param connection the client connection TODO: instead of minMaxKeyRange, 
have an interface for
+   *                   iterating through ranges as we may be sending to 
servers when we don't have
+   *                   to if the min is in first region and max is in last 
region, especially for
+   *                   point queries.
+   */
+  public ServerCacheClient(PhoenixConnection connection) {
+    this.connection = connection;
+  }
 
-        public ImmutableBytesWritable getCachePtr() throws IOException {
-            if(this.outputFile!=null){
-                try (InputStream fio = 
Files.newInputStream(outputFile.toPath())) {
-                    byte[] b = new byte[this.size];
-                    fio.read(b);
-                    cachePtr = new ImmutableBytesWritable(b);
-                }
-            }
-            return cachePtr;
-        }
+  public PhoenixConnection getConnection() {
+    return connection;
+  }
 
-        /**
-         * Gets the size in bytes of hash cache
-         */
-        public int getSize() {
-            return size;
-        }
+  /**
+   * Client-side representation of a server cache. Call {@link #close()} when 
usage is complete to
+   * free cache up on region server
+   * @since 0.1
+   */
+  public class ServerCache implements SQLCloseable {
+    private final int size;
+    private final byte[] id;
+    private final Map<HRegionLocation, Long> servers;
+    private ImmutableBytesWritable cachePtr;
+    private MemoryChunk chunk;
+    private File outputFile;
+    private long maxServerCacheTTL;
 
-        /**
-         * Gets the unique identifier for this hash cache
-         */
-        public byte[] getId() {
-            return id;
+    public ServerCache(byte[] id, Set<HRegionLocation> servers, 
ImmutableBytesWritable cachePtr,
+      ConnectionQueryServices services, boolean storeCacheOnClient) throws 
IOException {
+      maxServerCacheTTL =
+        
services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+          QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+      this.id = id;
+      this.servers = new HashMap();
+      long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+      for (HRegionLocation loc : servers) {
+        this.servers.put(loc, currentTime);
+      }
+      this.size = cachePtr.getLength();
+      if (storeCacheOnClient) {
+        try {
+          this.chunk = 
services.getMemoryManager().allocate(cachePtr.getLength());
+          this.cachePtr = cachePtr;
+        } catch (InsufficientMemoryException e) {
+          this.outputFile =
+            File.createTempFile("HashJoinCacheSpooler", ".bin", new 
File(services.getProps()
+              .get(QueryServices.SPOOL_DIRECTORY, 
QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)));
+          try (OutputStream fio = Files.newOutputStream(outputFile.toPath())) {
+            fio.write(cachePtr.get(), cachePtr.getOffset(), 
cachePtr.getLength());
+          }
         }
+      }
 
-        public boolean addServer(HRegionLocation loc) {
-            if(this.servers.containsKey(loc)) {
-                return false;
-            } else {
-                this.servers.put(loc, 
EnvironmentEdgeManager.currentTimeMillis());
-                return true;
-            }
-        }
+    }
 
-        public boolean isExpired(HRegionLocation loc) {
-            if(this.servers.containsKey(loc)) {
-                Long time = this.servers.get(loc);
-                if(EnvironmentEdgeManager.currentTimeMillis() - time > 
maxServerCacheTTL)
-                    return true; // cache was send more than maxTTL ms ago, 
expecting that it's expired
-            } else {
-                return false; // should be on server yet.
-            }
-            return false; // Unknown region location. Need to send the cache.
+    public ImmutableBytesWritable getCachePtr() throws IOException {
+      if (this.outputFile != null) {
+        try (InputStream fio = Files.newInputStream(outputFile.toPath())) {
+          byte[] b = new byte[this.size];
+          fio.read(b);
+          cachePtr = new ImmutableBytesWritable(b);
         }
+      }
+      return cachePtr;
+    }
 
+    /**
+     * Gets the size in bytes of hash cache
+     */
+    public int getSize() {
+      return size;
+    }
 
-        
-        /**
-         * Call to free up cache on region servers when no longer needed
-         */
-        @Override
-        public void close() throws SQLException {
-            try{
-                removeServerCache(this, servers.keySet());
-            }finally{
-                cachePtr = null;
-                if (chunk != null) {
-                    chunk.close();
-                }
-                if (outputFile != null) {
-                    outputFile.delete();
-                }
-            }
-        }
+    /**
+     * Gets the unique identifier for this hash cache
+     */
+    public byte[] getId() {
+      return id;
     }
 
-    public ServerCache createServerCache(byte[] cacheId, QueryPlan delegate)
-            throws SQLException, IOException {
-        PTable cacheUsingTable = delegate.getTableRef().getTable();
-        ConnectionQueryServices services = 
delegate.getContext().getConnection().getQueryServices();
-        List<HRegionLocation> locations = services.getAllTableRegions(
-                cacheUsingTable.getPhysicalName().getBytes(),
-                
delegate.getContext().getStatement().getQueryTimeoutInMillis());
-        int nRegions = locations.size();
-        Set<HRegionLocation> servers = new HashSet<>(nRegions);
-        cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
-        return new ServerCache(cacheId, servers, new ImmutableBytesWritable(
-                new byte[]{}), services, false);
+    public boolean addServer(HRegionLocation loc) {
+      if (this.servers.containsKey(loc)) {
+        return false;
+      } else {
+        this.servers.put(loc, EnvironmentEdgeManager.currentTimeMillis());
+        return true;
+      }
     }
 
-    public ServerCache addServerCache(
-            ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final 
byte[] txState,
-            final ServerCacheFactory cacheFactory, final PTable 
cacheUsingTable)
-            throws SQLException {
-        return addServerCache(keyRanges, cachePtr, txState, cacheFactory, 
cacheUsingTable, false);
+    public boolean isExpired(HRegionLocation loc) {
+      if (this.servers.containsKey(loc)) {
+        Long time = this.servers.get(loc);
+        if (EnvironmentEdgeManager.currentTimeMillis() - time > 
maxServerCacheTTL) return true; // cache
+                                                                               
                 // was

Review Comment:
   manually fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to