Repository: phoenix
Updated Branches:
  refs/heads/3.0 a56b2f48c -> 12494b7bf


PHOENIX-1250 Remove use of Closeables.closeQuietly

Remove the use of Guava's Closeables.closeQuietly to allow using
Phoenix within a client-side application that has a more recent
version of Guava. After this commit, Phoenix can be built against
Guava 18.0 (although full integration tests will not work because
HBase/Hadoop still rely on an older version of Guava internally).


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/12494b7b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/12494b7b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/12494b7b

Branch: refs/heads/3.0
Commit: 12494b7bfd3116d9a68977b63f3fbd9bfca6db5c
Parents: a56b2f4
Author: Gabriel Reid <gabri...@ngdata.com>
Authored: Tue Sep 16 11:58:33 2014 +0200
Committer: Gabriel Reid <gabri...@ngdata.com>
Committed: Wed Sep 17 09:23:24 2014 +0200

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |   4 +
 .../apache/phoenix/cache/ServerCacheClient.java |   4 +-
 .../phoenix/cache/aggcache/SpillFile.java       |   2 +-
 .../phoenix/cache/aggcache/SpillManager.java    |   2 +-
 .../cache/aggcache/SpillableGroupByCache.java   |   2 +-
 .../apache/phoenix/compile/FromCompiler.java    |   2 +-
 .../GroupedAggregateRegionObserver.java         |  32 ++--
 .../phoenix/iterate/TableResultIterator.java    |   2 +-
 .../query/ConnectionQueryServicesImpl.java      | 145 +++++++++----------
 .../org/apache/phoenix/util/Closeables.java     |  46 +++++-
 .../apache/phoenix/flume/sink/PhoenixSink.java  |   7 +-
 pom.xml                                         |   5 +
 12 files changed, 146 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index ad7b802..6d3c0c5 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -271,6 +271,10 @@
       <artifactId>jackson-xc</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index a67c639..a6ee92e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.cache;
 
-import static com.google.common.io.Closeables.closeQuietly;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -263,7 +261,7 @@ public class ServerCacheClient {
                        LOG.warn("Unable to remove hash cache for " + 
remainingOnServers, lastThrowable);
                }
        } finally {
-               closeQuietly(iterateOverTable);
+               Closeables.closeQuietly(iterateOverTable);
        }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
index 31ad5ce..8dd64d0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
@@ -28,11 +28,11 @@ import java.nio.channels.FileChannel.MapMode;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.phoenix.util.Closeables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
 
 /**
  * This class abstracts a SpillFile It is a accessible on a per page basis

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
index 3f4bf35..2fbea5c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
@@ -40,12 +40,12 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.TupleUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 
 /**
  * Class servers as an adapter between the in-memory LRU cache and the Spill 
data structures. It

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index d40431c..2aced38 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -51,11 +51,11 @@ import 
org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.InsufficientMemoryException;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.io.Closeables;
 
 /**
  * The main entry point is in GroupedAggregateRegionObserver. It instantiates 
a SpillableGroupByCache and invokes a

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index efc0973..d5cd240 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -67,6 +67,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +76,6 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 
 /**
  * Validates FROM clause and builds a ColumnResolver for resolving column 
references

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 44e9dfa..3654d03 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -64,6 +64,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SizedUtil;
@@ -72,11 +73,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
 
 /**
  * Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY 
clause)
- * 
+ *
  * @since 0.1
  */
 public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
@@ -186,13 +186,13 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
     }
 
     /**
-     * 
+     *
      * Cache for distinct values and their aggregations which is completely
      * in-memory (as opposed to spilling to disk). Used when 
GROUPBY_SPILLABLE_ATTRIB
      * is set to false. The memory usage is tracked at a coursed grain and will
      * throw and abort if too much is used.
      *
-     * 
+     *
      * @since 3.0.0
      */
     private static final class InMemoryGroupByCache implements GroupByCache {
@@ -200,9 +200,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         private final Map<ImmutableBytesPtr, Aggregator[]> aggregateMap;
         private final ServerAggregators aggregators;
         private final RegionCoprocessorEnvironment env;
-        
+
         private int estDistVals;
-        
+
         InMemoryGroupByCache(RegionCoprocessorEnvironment env, 
ImmutableBytesWritable tenantId, ServerAggregators aggregators, int 
estDistVals) {
             int estValueSize = aggregators.getEstimatedByteSize();
             long estSize = sizeOfUnorderedGroupByMap(estDistVals, 
estValueSize);
@@ -213,7 +213,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             this.aggregateMap = Maps.newHashMapWithExpectedSize(estDistVals);
             this.chunk = tenantCache.getMemoryManager().allocate(estSize);
         }
-        
+
         @Override
         public void close() throws IOException {
             this.chunk.close();
@@ -252,7 +252,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             chunk.resize(estSize);
 
             final List<KeyValue> aggResults = new 
ArrayList<KeyValue>(aggregateMap.size());
-            
+
             final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> 
cacheIter =
                     aggregateMap.entrySet().iterator();
             while (cacheIter.hasNext()) {
@@ -306,22 +306,22 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         public long size() {
             return aggregateMap.size();
         }
-        
+
     }
     private static final class GroupByCacheFactory {
         public static final GroupByCacheFactory INSTANCE = new 
GroupByCacheFactory();
-        
+
         private GroupByCacheFactory() {
         }
-        
+
         GroupByCache newCache(RegionCoprocessorEnvironment env, 
ImmutableBytesWritable tenantId, ServerAggregators aggregators, int 
estDistVals) {
             Configuration conf = env.getConfiguration();
             boolean spillableEnabled =
                     conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
             if (spillableEnabled) {
                 return new SpillableGroupByCache(env, tenantId, aggregators, 
estDistVals);
-            } 
-            
+            }
+
             return new InMemoryGroupByCache(env, tenantId, aggregators, 
estDistVals);
         }
     }
@@ -344,16 +344,16 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         byte[] estDistValsBytes = 
scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
         if (estDistValsBytes != null) {
             // Allocate 1.5x estimation
-            estDistVals = Math.max(MIN_DISTINCT_VALUES, 
+            estDistVals = Math.max(MIN_DISTINCT_VALUES,
                             (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
         }
 
         final boolean spillableEnabled =
                 conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
 
-        GroupByCache groupByCache = 
+        GroupByCache groupByCache =
                 GroupByCacheFactory.INSTANCE.newCache(
-                        env, ScanUtil.getTenantId(scan), 
+                        env, ScanUtil.getTenantId(scan),
                         aggregators, estDistVals);
 
         boolean success = false;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 756861b..97ff563 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -24,10 +24,10 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 
-import com.google.common.io.Closeables;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ServerUtil;
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 3633724..086dc0f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.query;
 
-import static com.google.common.io.Closeables.closeQuietly;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
@@ -104,6 +103,7 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixContextExecutor;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -118,7 +118,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.Closeables;
 
 public class ConnectionQueryServicesImpl extends DelegateQueryServices 
implements ConnectionQueryServices {
     private static final Logger logger = 
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
@@ -131,26 +130,26 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final String userName;
     private final 
ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
     private final StatsManager statsManager;
-    
+
     // Cache the latest meta data here for future connections
     // writes guarded by "latestMetaDataLock"
     private volatile PMetaData latestMetaData;
     private final Object latestMetaDataLock = new Object();
-    
+
     // Lowest HBase version on the cluster.
     private int lowestClusterHBaseVersion = Integer.MAX_VALUE;
     private boolean hasInvalidIndexConfiguration = false;
-    
+
     @GuardedBy("connectionCountLock")
     private int connectionCount = 0;
     private final Object connectionCountLock = new Object();
-    
+
     private HConnection connection;
     private volatile boolean initialized;
-    
+
     // writes guarded by "this"
     private volatile boolean closed;
-    
+
     private volatile SQLException initializationException;
     // setting this member variable guarded by "connectionCountLock"
     private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = 
Maps.newConcurrentMap();
@@ -194,7 +193,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         String hbaseVersion = VersionInfo.getVersion();
         this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
     }
-    
+
     private void openConnection() throws SQLException {
         try {
             // check if we need to authenticate with kerberos
@@ -220,7 +219,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public StatsManager getStatsManager() {
         return this.statsManager;
     }
-    
+
     @Override
     public HTableInterface getTable(byte[] tableName) throws SQLException {
         try {
@@ -231,9 +230,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             throw new 
TableNotFoundException(Bytes.toString(schemaAndTableName[0]), 
Bytes.toString(schemaAndTableName[1]));
         } catch (IOException e) {
                throw new SQLException(e);
-        } 
+        }
     }
-    
+
     @Override
     public HTableDescriptor getTableDescriptor(byte[] tableName) throws 
SQLException {
         HTableInterface htable = getTable(tableName);
@@ -276,7 +275,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 sqlE = e;
             } finally {
                 try {
-                    // Clear any client-side caches.  
+                    // Clear any client-side caches.
                     statsManager.clearStats();
                 } catch (SQLException e) {
                     if (sqlE == null) {
@@ -316,7 +315,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 }
             }
         }
-    }    
+    }
 
     protected ConnectionQueryServices newChildQueryService() {
         return new ChildQueryServices(this);
@@ -343,12 +342,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public void clearTableRegionCache(byte[] tableName) throws SQLException {
         connection.clearRegionCache(tableName);
     }
-    
+
     @Override
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws 
SQLException {
         /*
          * Use HConnection.getRegionLocation as it uses the cache in 
HConnection, while getting
-         * all region locations from the HTable doesn't. 
+         * all region locations from the HTable doesn't.
          */
         int retryCount = 0, maxRetryCount = 1;
         boolean reload =false;
@@ -387,8 +386,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 // If existing table isn't older than new table, don't replace
                 // If a client opens a connection at an earlier timestamp, 
this can happen
                 PTable existingTable = latestMetaData.getTable(new 
PTableKey(table.getTenantId(), table.getName().getString()));
-                if (existingTable.getTimeStamp() >= table.getTimeStamp()) { 
-                    return latestMetaData; 
+                if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
+                    return latestMetaData;
                 }
             } catch (TableNotFoundException e) {}
             latestMetaData = latestMetaData.addTable(table);
@@ -525,7 +524,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         hcd.setValue(key, value == null ? null : value.toString());
       }
     }
-    
+
     private void addCoprocessors(byte[] tableName, HTableDescriptor 
descriptor, PTableType tableType) throws SQLException {
         // The phoenix jar must be available on HBase classpath
         try {
@@ -541,7 +540,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             if 
(!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, 
null);
             }
-            
+
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing 
coprocessor for indexes. Also,
             // don't install on the metadata table until we fix the TODO there.
@@ -550,7 +549,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
                 Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, 
opts);
             }
-            
+
             // Setup split policy on Phoenix metadata table to ensure that the 
key values of a Phoenix table
             // stay on the same region.
             if (SchemaUtil.isMetaTable(tableName)) {
@@ -569,9 +568,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             throw ServerUtil.parseServerException(e);
         }
     }
-    
+
     private HTableDescriptor generateTableDescriptor(byte[] tableName, 
HTableDescriptor existingDesc, PTableType tableType, Map<String,Object> 
tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) 
throws SQLException {
-        String defaultFamilyName = 
(String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);  
              
+        String defaultFamilyName = 
(String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
         HTableDescriptor descriptor = (existingDesc != null) ? new 
HTableDescriptor(existingDesc) : new HTableDescriptor(tableName);
         for (Entry<String,Object> entry : tableProps.entrySet()) {
             String key = entry.getKey();
@@ -581,7 +580,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         if (families.isEmpty()) {
             if (tableType != PTableType.VIEW) {
                 byte[] defaultFamilyByes = defaultFamilyName == null ? 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName);
-                // Add dummy column family so we have key values for tables 
that 
+                // Add dummy column family so we have key values for tables 
that
                 HColumnDescriptor columnDescriptor = 
generateColumnFamilyDescriptor(new 
Pair<byte[],Map<String,Object>>(defaultFamilyByes,Collections.<String,Object>emptyMap()),
 tableType);
                 descriptor.addFamily(columnDescriptor);
             }
@@ -639,7 +638,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     }
                     modifyColumnFamilyDescriptor(columnDescriptor, family);
                 }
-                
+
                 if (columnDescriptor.equals(oldDescriptor)) {
                     // Table already has family and it's the same.
                     return;
@@ -674,9 +673,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
         }
     }
-    
+
     /**
-     * 
+     *
      * @param tableName
      * @param splits
      * @param modifyExistingMetaData TODO
@@ -707,7 +706,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
 
             HTableDescriptor newDesc = generateTableDescriptor(tableName, 
existingDesc, tableType , props, families, splits);
-            
+
             if (!tableExist) {
                 /*
                  * Remove the splitPolicy attribute due to an HBase bug (see 
below)
@@ -732,7 +731,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                      * Now we modify the table to add the split policy, since 
we know that the client and
                      * server and compatible. This works around a nasty, known 
HBase bug where if a split
                      * policy class cannot be found on the server, the HBase 
table is left in a horrible
-                     * "ghost" state where it can't be used and can't be 
deleted without bouncing the master. 
+                     * "ghost" state where it can't be used and can't be 
deleted without bouncing the master.
                      */
                     newDesc.setValue(HTableDescriptor.SPLIT_POLICY, 
MetaDataSplitPolicy.class.getName());
                     admin.disableTable(tableName);
@@ -752,12 +751,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 if (isMetaTable) {
                     checkClientServerCompatibility();
                 }
-                         
+
                 // TODO: Take advantage of online schema change ability by 
setting "hbase.online.schema.update.enable" to true
                 admin.disableTable(tableName);
                 admin.modifyTable(tableName, newDesc);
                 admin.enableTable(tableName);
-                
+
                 return newDesc;
             }
 
@@ -789,7 +788,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return 
!MetaDataUtil.decodeMutableIndexConfiguredProperly(serverVersion);
     }
-    
+
     private static boolean isCompatible(Long serverVersion) {
         if (serverVersion == null) {
             return false;
@@ -820,7 +819,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         public Long call(MetaDataProtocol instance) throws 
IOException {
                             return instance.getVersion();
                         }
-                    }, 
+                    },
                     new Batch.Callback<Long>(){
                         @Override
                         public void update(byte[] region, byte[] row, Long 
value) {
@@ -865,7 +864,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 List<byte[]> regionKeys = 
Collections.singletonList(regionLocation.getRegionInfo().getStartKey());
                 final Map<byte[],MetaDataMutationResult> results = 
Maps.newHashMapWithExpectedSize(1);
                 connection.processExecs(MetaDataProtocol.class, regionKeys,
-                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
this.getDelegate().getExecutor(), callable, 
+                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
this.getDelegate().getExecutor(), callable,
                         new Batch.Callback<MetaDataMutationResult>(){
                             @Override
                             public void update(byte[] region, byte[] row, 
MetaDataMutationResult value) {
@@ -887,7 +886,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             throw new SQLException(t);
         }
     }
-    
+
     // Our property values are translated using toString, so we need to 
"string-ify" this.
     private static final String TRUE_BYTES_AS_STRING = 
Bytes.toString(PDataType.TRUE_BYTES);
 
@@ -897,7 +896,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             maxFileSize = this.config.getLong(HConstants.HREGION_MAX_FILESIZE, 
HConstants.DEFAULT_MAX_FILE_SIZE);
         }
         byte[] physicalIndexName = 
MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
-        
+
         int indexMaxFileSizePerc;
         // Get percentage to use from table props first and then fallback to 
config
         Integer indexMaxFileSizePercProp = 
(Integer)tableProps.remove(QueryServices.INDEX_MAX_FILESIZE_PERC_ATTRIB);
@@ -945,7 +944,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 // Ignore, as we may never have created a view index table
             }
         } catch (IOException e) {
-            throw ServerUtil.parseServerException(e); 
+            throw ServerUtil.parseServerException(e);
         } finally {
             try {
                 if (admin != null) admin.close();
@@ -955,7 +954,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return wasDeleted;
     }
-    
+
     @Override
     public MetaDataMutationResult createTable(final List<Mutation> 
tableMetaData, byte[] physicalTableName, PTableType tableType,
             Map<String,Object> tableProps, final 
List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws 
SQLException {
@@ -999,7 +998,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
             ensureViewIndexTableCreated(tableName, tableProps, 
familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null, 
MetaDataUtil.getClientTimeStamp(m));
         }
-        
+
         byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, 
tableBytes);
         MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
             new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() {
@@ -1040,7 +1039,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                       return instance.dropTable(tableMetaData, 
tableType.getSerializedValue(), cascade);
                     }
                 });
-        
+
         final MutationCode code = result.getMutationCode();
         switch(code) {
         case TABLE_ALREADY_EXISTS:
@@ -1060,7 +1059,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
           return result;
     }
-    
+
     private void dropTables(final List<byte[]> tableNamesToDelete) throws 
SQLException {
         HBaseAdmin admin = null;
         SQLException sqlE = null;
@@ -1075,7 +1074,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     }
                 }
             }
-            
+
         } catch (IOException e) {
             sqlE = ServerUtil.parseServerException(e);
         } finally {
@@ -1106,11 +1105,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return props;
     }
-    
+
     private void ensureViewIndexTableCreated(PName tenantId, byte[] 
physicalIndexTableName, long timestamp) throws SQLException {
         PTable table;
         String name = Bytes.toString(
-                physicalIndexTableName, 
+                physicalIndexTableName,
                 MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length,
                 
physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length);
         try {
@@ -1133,7 +1132,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         ensureViewIndexTableCreated(table, timestamp);
     }
-    
+
     private void ensureViewIndexTableCreated(PTable table, long timestamp) 
throws SQLException {
         byte[] physicalTableName = table.getPhysicalName().getBytes();
         HTableDescriptor htableDesc = 
this.getTableDescriptor(physicalTableName);
@@ -1157,10 +1156,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         if (table.getBucketNum() != null) {
             splits = SaltingUtil.getSalteByteSplitPoints(table.getBucketNum());
         }
-        
+
         ensureViewIndexTableCreated(physicalTableName, tableProps, families, 
splits, timestamp);
     }
-    
+
     @Override
     public MetaDataMutationResult addColumn(final List<Mutation> 
tableMetaData, List<Pair<byte[],Map<String,Object>>> families, PTable table) 
throws SQLException {
         byte[][] rowKeyMetaData = new byte[3][];
@@ -1199,7 +1198,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                && Boolean.FALSE.equals(PDataType.BOOLEAN.toObject(ptr))) {
                 flushTable(table.getPhysicalName().getBytes());
             }
-            
+
             if (tableType == PTableType.TABLE) {
                 // If we're changing MULTI_TENANT to true or false, create or 
drop the view index table
                 if (MetaDataUtil.getMutationValue(m, 
PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
@@ -1243,11 +1242,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             break;
         }
         return result;
-       
+
     }
 
     // Keeping this to use for further upgrades
-    protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection 
oldMetaConnection, 
+    protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection 
oldMetaConnection,
         String tableName, long timestamp, String columns) throws SQLException {
 
         Properties props = new Properties(oldMetaConnection.getClientInfo());
@@ -1276,7 +1275,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return metaConnection;
     }
-    
+
     @Override
     public void init(final String url, final Properties props) throws 
SQLException {
         try {
@@ -1317,10 +1316,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             } catch (TableAlreadyExistsException ignore) {
                                 // This will occur if we have an older 
SYSTEM.CATALOG and we need to update it to include
                                 // any new columns we've added.
-                                metaConnection = 
addColumnsIfNotExists(metaConnection, 
-                                  PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
-                                  MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, 
-                                  PhoenixDatabaseMetaData.INDEX_TYPE + " " + 
PDataType.UNSIGNED_TINYINT.getSqlTypeName() + 
+                                metaConnection = 
addColumnsIfNotExists(metaConnection,
+                                  PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                  MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+                                  PhoenixDatabaseMetaData.INDEX_TYPE + " " + 
PDataType.UNSIGNED_TINYINT.getSqlTypeName() +
                                   ", " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + 
PDataType.LONG.getSqlTypeName());
                             }
                             try {
@@ -1332,12 +1331,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 // This will occur if we have an older 
SYSTEM.SEQUENCE, so we need to update it to include
                                 // any new columns we've added.
                                 String newColumns =
-                                        MIN_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", " 
-                                                + MAX_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", " 
-                                                + CYCLE_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName() + ", " 
+                                        MIN_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", "
+                                                + MAX_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", "
+                                                + CYCLE_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName() + ", "
                                                 + LIMIT_REACHED_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName();
-                                metaConnection = 
addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, 
-                                    
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); 
+                                metaConnection = 
addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
+                                    
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns);
                             }
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
@@ -1490,7 +1489,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } catch (IOException e) {
                 throw ServerUtil.parseServerException(e);
             } finally {
-                closeQuietly(htable);
+                Closeables.closeQuietly(htable);
             }
         } finally {
             sequence.getLock().unlock();
@@ -1516,7 +1515,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } catch (IOException e) {
                 throw ServerUtil.parseServerException(e);
             } finally {
-                closeQuietly(htable);
+                Closeables.closeQuietly(htable);
             }
         } finally {
             sequence.getLock().unlock();
@@ -1527,7 +1526,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
      * Gets the current sequence value
      * @param tenantId
      * @param sequence
-     * @throws SQLException if cached sequence cannot be found 
+     * @throws SQLException if cached sequence cannot be found
      */
     @Override
     public long currentSequenceValue(SequenceKey sequenceKey, long timestamp) 
throws SQLException {
@@ -1548,7 +1547,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             sequence.getLock().unlock();
         }
     }
-    
+
     /**
      * Verifies that sequences exist and reserves values for them if 
reserveValues is true
      */
@@ -1556,17 +1555,17 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public void validateSequences(List<SequenceKey> sequenceKeys, long 
timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) 
throws SQLException {
         incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 
0, action);
     }
-    
+
     /**
      * Increment any of the set of sequences that need more values. These are 
the sequences
      * that are asking for the next value within a given statement. The 
returned sequences
-     * are the ones that were not found because they were deleted by another 
client. 
+     * are the ones that were not found because they were deleted by another 
client.
      * @param tenantId
      * @param sequenceKeys sorted list of sequence kyes
      * @param batchSize
      * @param timestamp
      * @throws SQLException if any of the sequences cannot be found
-     * 
+     *
      */
     @Override
     public void incrementSequences(List<SequenceKey> sequenceKeys, long 
timestamp, long[] values, SQLException[] exceptions) throws SQLException {
@@ -1752,7 +1751,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
         }
     }
-    
+
     @Override
     public void addConnection(PhoenixConnection connection) throws 
SQLException {
         synchronized (connectionCountLock) {
@@ -1790,26 +1789,26 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         // For now, only Feature is REVERSE_SCAN and it's not supported in any 
version yet
         return false;
     }
-    
+
     @Override
     public String getUserName() {
         return userName;
     }
-    
+
     private void checkClosed() {
         if (closed) {
             throwConnectionClosedException();
         }
     }
-    
+
     private void throwConnectionClosedIfNullMetaData() {
         if (latestMetaData == null) {
             throwConnectionClosedException();
         }
     }
-    
+
     private void throwConnectionClosedException() {
         throw new IllegalStateException("Connection to the cluster is closed");
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java
index c4b15dc..3046929 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java
@@ -17,11 +17,16 @@
  */
 package org.apache.phoenix.util;
 
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
 
 
 /**
@@ -29,10 +34,38 @@ import com.google.common.collect.Iterables;
  * 
  */
 public class Closeables {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Closeables.class);
+
     /** Not constructed */
     private Closeables() { }
     
     /**
+     * Close a {@code Closeable}, returning an {@code IOException} if it 
occurs while closing
+     * instead of throwing it. This is nearly a clone of the Guava 
Closeables.closeQuietly method
+     * which has long since been removed from Guava.
+     *
+     * Use of this method should be avoided -- quietly swallowing IOExceptions 
(particularly on
+     * Closeables that are being written to) is a code smell. Use of the 
equivalent method in
+     * Guava was done for this reason.
+     *
+     * @param closeable the Closeable to be closed, can be null
+     * @return the IOException if one was thrown, otherwise {@code null}
+     */
+    public static IOException closeQuietly(@Nullable Closeable closeable) {
+        if (closeable == null) {
+            return null;
+        }
+        try {
+            closeable.close();
+            return null;
+        } catch (IOException e) {
+            LOGGER.error("Error closing " + closeable, e);
+            return e;
+        }
+    }
+
+    /**
      * Allows you to close as many of the {@link Closeable}s as possible.
      * 
      * If any of the close's fail with an IOException, those exception(s) will
@@ -48,11 +81,10 @@ public class Closeables {
         
         LinkedList<IOException> exceptions = null;
         for (Closeable closeable : iterable) {
-            try {
-                closeable.close();
-            } catch (IOException x) {
+            IOException ioe = closeQuietly(closeable);
+            if (ioe != null) {
                 if (exceptions == null) exceptions = new 
LinkedList<IOException>();
-                exceptions.add(x);
+                exceptions.add(ioe);
             }
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
index efcbef6..f9c929d 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
@@ -140,7 +139,7 @@ public final class PhoenixSink  extends AbstractSink 
implements Configurable {
         Channel channel = getChannel();
         Transaction transaction = null;
         List<Event>  events = 
Lists.newArrayListWithExpectedSize(this.batchSize); 
-        Stopwatch watch = new Stopwatch().start();
+        long startTime = System.nanoTime();
         try {
             transaction = channel.getTransaction();
             transaction.begin();
@@ -194,7 +193,9 @@ public final class PhoenixSink  extends AbstractSink 
implements Configurable {
             throw new EventDeliveryException("Failed to persist message", e);
         }
         finally {
-            logger.error(String.format("Time taken to process [%s] events was 
[%s] seconds",events.size(),watch.stop().elapsedTime(TimeUnit.SECONDS)));
+            logger.info(String.format("Time taken to process [%s] events was 
[%s] seconds",
+                    events.size(),
+                    TimeUnit.SECONDS.convert(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS)));
             if( transaction != null ) {
                 transaction.close();
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7be22ca..d2d30f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -523,6 +523,11 @@
         <scope>compile</scope>
       </dependency>
       <dependency>
+        <groupId>com.google.code.findbugs</groupId>
+        <artifactId>jsr305</artifactId>
+        <version>2.0.1</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-jaxrs</artifactId>
         <version>${jackson.version}</version>

Reply via email to