Repository: phoenix Updated Branches: refs/heads/3.0 a56b2f48c -> 12494b7bf
PHOENIX-1250 Remove use of Closeables.closeQuietly Remove the use of Guava's Closeables.closeQuietly to allow using Phoenix within a client-side application that has a more recent version of Guava. After this commit, Phoenix can be built against Guava 18.0 (although full integration tests will not work because HBase/Hadoop still rely on an older version of Guava internally). Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/12494b7b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/12494b7b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/12494b7b Branch: refs/heads/3.0 Commit: 12494b7bfd3116d9a68977b63f3fbd9bfca6db5c Parents: a56b2f4 Author: Gabriel Reid <gabri...@ngdata.com> Authored: Tue Sep 16 11:58:33 2014 +0200 Committer: Gabriel Reid <gabri...@ngdata.com> Committed: Wed Sep 17 09:23:24 2014 +0200 ---------------------------------------------------------------------- phoenix-core/pom.xml | 4 + .../apache/phoenix/cache/ServerCacheClient.java | 4 +- .../phoenix/cache/aggcache/SpillFile.java | 2 +- .../phoenix/cache/aggcache/SpillManager.java | 2 +- .../cache/aggcache/SpillableGroupByCache.java | 2 +- .../apache/phoenix/compile/FromCompiler.java | 2 +- .../GroupedAggregateRegionObserver.java | 32 ++-- .../phoenix/iterate/TableResultIterator.java | 2 +- .../query/ConnectionQueryServicesImpl.java | 145 +++++++++---------- .../org/apache/phoenix/util/Closeables.java | 46 +++++- .../apache/phoenix/flume/sink/PhoenixSink.java | 7 +- pom.xml | 5 + 12 files changed, 146 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index ad7b802..6d3c0c5 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -271,6 +271,10 @@ <artifactId>jackson-xc</artifactId> </dependency> <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index a67c639..a6ee92e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.cache; -import static com.google.common.io.Closeables.closeQuietly; - import java.io.Closeable; import java.io.IOException; import java.sql.SQLException; @@ -263,7 +261,7 @@ public class ServerCacheClient { LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable); } } finally { - closeQuietly(iterateOverTable); + Closeables.closeQuietly(iterateOverTable); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java index 31ad5ce..8dd64d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java @@ -28,11 +28,11 @@ import java.nio.channels.FileChannel.MapMode; import java.util.Map; import java.util.UUID; +import org.apache.phoenix.util.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; -import com.google.common.io.Closeables; /** * This class abstracts a SpillFile It is a accessible on a per page basis http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java index 3f4bf35..2fbea5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java @@ -40,12 +40,12 @@ import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.TupleUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.io.Closeables; /** * Class servers as an adapter between the in-memory LRU cache and the Spill data structures. It http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java index d40431c..2aced38 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java @@ -51,11 +51,11 @@ import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.InsufficientMemoryException; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.KeyValueUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.io.Closeables; /** * The main entry point is in GroupedAggregateRegionObserver. It instantiates a SpillableGroupByCache and invokes a http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index efc0973..d5cd240 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -67,6 +67,7 @@ import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +76,6 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; -import com.google.common.io.Closeables; /** * Validates FROM clause and builds a ColumnResolver for resolving column references http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 44e9dfa..3654d03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -64,6 +64,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SizedUtil; @@ -72,11 +73,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; -import com.google.common.io.Closeables; /** * Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY clause) - * + * * @since 0.1 */ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { @@ -186,13 +186,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } /** - * + * * Cache for distinct values and their aggregations which is completely * in-memory (as opposed to spilling to disk). Used when GROUPBY_SPILLABLE_ATTRIB * is set to false. The memory usage is tracked at a coursed grain and will * throw and abort if too much is used. * - * + * * @since 3.0.0 */ private static final class InMemoryGroupByCache implements GroupByCache { @@ -200,9 +200,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private final Map<ImmutableBytesPtr, Aggregator[]> aggregateMap; private final ServerAggregators aggregators; private final RegionCoprocessorEnvironment env; - + private int estDistVals; - + InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, ServerAggregators aggregators, int estDistVals) { int estValueSize = aggregators.getEstimatedByteSize(); long estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize); @@ -213,7 +213,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { this.aggregateMap = Maps.newHashMapWithExpectedSize(estDistVals); this.chunk = tenantCache.getMemoryManager().allocate(estSize); } - + @Override public void close() throws IOException { this.chunk.close(); @@ -252,7 +252,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { chunk.resize(estSize); final List<KeyValue> aggResults = new ArrayList<KeyValue>(aggregateMap.size()); - + final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter = aggregateMap.entrySet().iterator(); while (cacheIter.hasNext()) { @@ -306,22 +306,22 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { public long size() { return aggregateMap.size(); } - + } private static final class GroupByCacheFactory { public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory(); - + private GroupByCacheFactory() { } - + GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, ServerAggregators aggregators, int estDistVals) { Configuration conf = env.getConfiguration(); boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); if (spillableEnabled) { return new SpillableGroupByCache(env, tenantId, aggregators, estDistVals); - } - + } + return new InMemoryGroupByCache(env, tenantId, aggregators, estDistVals); } } @@ -344,16 +344,16 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES); if (estDistValsBytes != null) { // Allocate 1.5x estimation - estDistVals = Math.max(MIN_DISTINCT_VALUES, + estDistVals = Math.max(MIN_DISTINCT_VALUES, (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); } final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); - GroupByCache groupByCache = + GroupByCache groupByCache = GroupByCacheFactory.INSTANCE.newCache( - env, ScanUtil.getTenantId(scan), + env, ScanUtil.getTenantId(scan), aggregators, estDistVals); boolean success = false; http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 756861b..97ff563 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -24,10 +24,10 @@ import java.util.List; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; -import com.google.common.io.Closeables; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ServerUtil; http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 3633724..086dc0f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.query; -import static com.google.common.io.Closeables.closeQuietly; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; @@ -104,6 +103,7 @@ import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixContextExecutor; import org.apache.phoenix.util.PhoenixRuntime; @@ -118,7 +118,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.io.Closeables; public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); @@ -131,26 +130,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final String userName; private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; private final StatsManager statsManager; - + // Cache the latest meta data here for future connections // writes guarded by "latestMetaDataLock" private volatile PMetaData latestMetaData; private final Object latestMetaDataLock = new Object(); - + // Lowest HBase version on the cluster. private int lowestClusterHBaseVersion = Integer.MAX_VALUE; private boolean hasInvalidIndexConfiguration = false; - + @GuardedBy("connectionCountLock") private int connectionCount = 0; private final Object connectionCountLock = new Object(); - + private HConnection connection; private volatile boolean initialized; - + // writes guarded by "this" private volatile boolean closed; - + private volatile SQLException initializationException; // setting this member variable guarded by "connectionCountLock" private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap(); @@ -194,7 +193,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String hbaseVersion = VersionInfo.getVersion(); this.kvBuilder = KeyValueBuilder.get(hbaseVersion); } - + private void openConnection() throws SQLException { try { // check if we need to authenticate with kerberos @@ -220,7 +219,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public StatsManager getStatsManager() { return this.statsManager; } - + @Override public HTableInterface getTable(byte[] tableName) throws SQLException { try { @@ -231,9 +230,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1])); } catch (IOException e) { throw new SQLException(e); - } + } } - + @Override public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException { HTableInterface htable = getTable(tableName); @@ -276,7 +275,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement sqlE = e; } finally { try { - // Clear any client-side caches. + // Clear any client-side caches. statsManager.clearStats(); } catch (SQLException e) { if (sqlE == null) { @@ -316,7 +315,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - } + } protected ConnectionQueryServices newChildQueryService() { return new ChildQueryServices(this); @@ -343,12 +342,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public void clearTableRegionCache(byte[] tableName) throws SQLException { connection.clearRegionCache(tableName); } - + @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { /* * Use HConnection.getRegionLocation as it uses the cache in HConnection, while getting - * all region locations from the HTable doesn't. + * all region locations from the HTable doesn't. */ int retryCount = 0, maxRetryCount = 1; boolean reload =false; @@ -387,8 +386,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // If existing table isn't older than new table, don't replace // If a client opens a connection at an earlier timestamp, this can happen PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString())); - if (existingTable.getTimeStamp() >= table.getTimeStamp()) { - return latestMetaData; + if (existingTable.getTimeStamp() >= table.getTimeStamp()) { + return latestMetaData; } } catch (TableNotFoundException e) {} latestMetaData = latestMetaData.addTable(table); @@ -525,7 +524,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement hcd.setValue(key, value == null ? null : value.toString()); } } - + private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType) throws SQLException { // The phoenix jar must be available on HBase classpath try { @@ -541,7 +540,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); } - + // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also, // don't install on the metadata table until we fix the TODO there. @@ -550,7 +549,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts); } - + // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table // stay on the same region. if (SchemaUtil.isMetaTable(tableName)) { @@ -569,9 +568,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throw ServerUtil.parseServerException(e); } } - + private HTableDescriptor generateTableDescriptor(byte[] tableName, HTableDescriptor existingDesc, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException { - String defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME); + String defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME); HTableDescriptor descriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc) : new HTableDescriptor(tableName); for (Entry<String,Object> entry : tableProps.entrySet()) { String key = entry.getKey(); @@ -581,7 +580,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (families.isEmpty()) { if (tableType != PTableType.VIEW) { byte[] defaultFamilyByes = defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName); - // Add dummy column family so we have key values for tables that + // Add dummy column family so we have key values for tables that HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(new Pair<byte[],Map<String,Object>>(defaultFamilyByes,Collections.<String,Object>emptyMap()), tableType); descriptor.addFamily(columnDescriptor); } @@ -639,7 +638,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } modifyColumnFamilyDescriptor(columnDescriptor, family); } - + if (columnDescriptor.equals(oldDescriptor)) { // Table already has family and it's the same. return; @@ -674,9 +673,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - + /** - * + * * @param tableName * @param splits * @param modifyExistingMetaData TODO @@ -707,7 +706,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType , props, families, splits); - + if (!tableExist) { /* * Remove the splitPolicy attribute due to an HBase bug (see below) @@ -732,7 +731,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * Now we modify the table to add the split policy, since we know that the client and * server and compatible. This works around a nasty, known HBase bug where if a split * policy class cannot be found on the server, the HBase table is left in a horrible - * "ghost" state where it can't be used and can't be deleted without bouncing the master. + * "ghost" state where it can't be used and can't be deleted without bouncing the master. */ newDesc.setValue(HTableDescriptor.SPLIT_POLICY, MetaDataSplitPolicy.class.getName()); admin.disableTable(tableName); @@ -752,12 +751,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (isMetaTable) { checkClientServerCompatibility(); } - + // TODO: Take advantage of online schema change ability by setting "hbase.online.schema.update.enable" to true admin.disableTable(tableName); admin.modifyTable(tableName, newDesc); admin.enableTable(tableName); - + return newDesc; } @@ -789,7 +788,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return !MetaDataUtil.decodeMutableIndexConfiguredProperly(serverVersion); } - + private static boolean isCompatible(Long serverVersion) { if (serverVersion == null) { return false; @@ -820,7 +819,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public Long call(MetaDataProtocol instance) throws IOException { return instance.getVersion(); } - }, + }, new Batch.Callback<Long>(){ @Override public void update(byte[] region, byte[] row, Long value) { @@ -865,7 +864,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement List<byte[]> regionKeys = Collections.singletonList(regionLocation.getRegionInfo().getStartKey()); final Map<byte[],MetaDataMutationResult> results = Maps.newHashMapWithExpectedSize(1); connection.processExecs(MetaDataProtocol.class, regionKeys, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getDelegate().getExecutor(), callable, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getDelegate().getExecutor(), callable, new Batch.Callback<MetaDataMutationResult>(){ @Override public void update(byte[] region, byte[] row, MetaDataMutationResult value) { @@ -887,7 +886,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throw new SQLException(t); } } - + // Our property values are translated using toString, so we need to "string-ify" this. private static final String TRUE_BYTES_AS_STRING = Bytes.toString(PDataType.TRUE_BYTES); @@ -897,7 +896,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement maxFileSize = this.config.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); } byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); - + int indexMaxFileSizePerc; // Get percentage to use from table props first and then fallback to config Integer indexMaxFileSizePercProp = (Integer)tableProps.remove(QueryServices.INDEX_MAX_FILESIZE_PERC_ATTRIB); @@ -945,7 +944,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Ignore, as we may never have created a view index table } } catch (IOException e) { - throw ServerUtil.parseServerException(e); + throw ServerUtil.parseServerException(e); } finally { try { if (admin != null) admin.close(); @@ -955,7 +954,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return wasDeleted; } - + @Override public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, byte[] physicalTableName, PTableType tableType, Map<String,Object> tableProps, final List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException { @@ -999,7 +998,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } ensureViewIndexTableCreated(tableName, tableProps, familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null, MetaDataUtil.getClientTimeStamp(m)); } - + byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { @@ -1040,7 +1039,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return instance.dropTable(tableMetaData, tableType.getSerializedValue(), cascade); } }); - + final MutationCode code = result.getMutationCode(); switch(code) { case TABLE_ALREADY_EXISTS: @@ -1060,7 +1059,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return result; } - + private void dropTables(final List<byte[]> tableNamesToDelete) throws SQLException { HBaseAdmin admin = null; SQLException sqlE = null; @@ -1075,7 +1074,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - + } catch (IOException e) { sqlE = ServerUtil.parseServerException(e); } finally { @@ -1106,11 +1105,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return props; } - + private void ensureViewIndexTableCreated(PName tenantId, byte[] physicalIndexTableName, long timestamp) throws SQLException { PTable table; String name = Bytes.toString( - physicalIndexTableName, + physicalIndexTableName, MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length, physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length); try { @@ -1133,7 +1132,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } ensureViewIndexTableCreated(table, timestamp); } - + private void ensureViewIndexTableCreated(PTable table, long timestamp) throws SQLException { byte[] physicalTableName = table.getPhysicalName().getBytes(); HTableDescriptor htableDesc = this.getTableDescriptor(physicalTableName); @@ -1157,10 +1156,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (table.getBucketNum() != null) { splits = SaltingUtil.getSalteByteSplitPoints(table.getBucketNum()); } - + ensureViewIndexTableCreated(physicalTableName, tableProps, families, splits, timestamp); } - + @Override public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, List<Pair<byte[],Map<String,Object>>> families, PTable table) throws SQLException { byte[][] rowKeyMetaData = new byte[3][]; @@ -1199,7 +1198,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && Boolean.FALSE.equals(PDataType.BOOLEAN.toObject(ptr))) { flushTable(table.getPhysicalName().getBytes()); } - + if (tableType == PTableType.TABLE) { // If we're changing MULTI_TENANT to true or false, create or drop the view index table if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){ @@ -1243,11 +1242,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement break; } return result; - + } // Keeping this to use for further upgrades - protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, + protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns) throws SQLException { Properties props = new Properties(oldMetaConnection.getClientInfo()); @@ -1276,7 +1275,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return metaConnection; } - + @Override public void init(final String url, final Properties props) throws SQLException { try { @@ -1317,10 +1316,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (TableAlreadyExistsException ignore) { // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include // any new columns we've added. - metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, - PhoenixDatabaseMetaData.INDEX_TYPE + " " + PDataType.UNSIGNED_TINYINT.getSqlTypeName() + + metaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PDataType.UNSIGNED_TINYINT.getSqlTypeName() + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PDataType.LONG.getSqlTypeName()); } try { @@ -1332,12 +1331,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include // any new columns we've added. String newColumns = - MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " - + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " - + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", " + MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " + + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " + + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", " + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName(); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); } } catch (Exception e) { if (e instanceof SQLException) { @@ -1490,7 +1489,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (IOException e) { throw ServerUtil.parseServerException(e); } finally { - closeQuietly(htable); + Closeables.closeQuietly(htable); } } finally { sequence.getLock().unlock(); @@ -1516,7 +1515,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (IOException e) { throw ServerUtil.parseServerException(e); } finally { - closeQuietly(htable); + Closeables.closeQuietly(htable); } } finally { sequence.getLock().unlock(); @@ -1527,7 +1526,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * Gets the current sequence value * @param tenantId * @param sequence - * @throws SQLException if cached sequence cannot be found + * @throws SQLException if cached sequence cannot be found */ @Override public long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException { @@ -1548,7 +1547,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement sequence.getLock().unlock(); } } - + /** * Verifies that sequences exist and reserves values for them if reserveValues is true */ @@ -1556,17 +1555,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 0, action); } - + /** * Increment any of the set of sequences that need more values. These are the sequences * that are asking for the next value within a given statement. The returned sequences - * are the ones that were not found because they were deleted by another client. + * are the ones that were not found because they were deleted by another client. * @param tenantId * @param sequenceKeys sorted list of sequence kyes * @param batchSize * @param timestamp * @throws SQLException if any of the sequences cannot be found - * + * */ @Override public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { @@ -1752,7 +1751,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - + @Override public void addConnection(PhoenixConnection connection) throws SQLException { synchronized (connectionCountLock) { @@ -1790,26 +1789,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // For now, only Feature is REVERSE_SCAN and it's not supported in any version yet return false; } - + @Override public String getUserName() { return userName; } - + private void checkClosed() { if (closed) { throwConnectionClosedException(); } } - + private void throwConnectionClosedIfNullMetaData() { if (latestMetaData == null) { throwConnectionClosedException(); } } - + private void throwConnectionClosedException() { throw new IllegalStateException("Connection to the cluster is closed"); } - + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java b/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java index c4b15dc..3046929 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/Closeables.java @@ -17,11 +17,16 @@ */ package org.apache.phoenix.util; +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; -import java.util.*; - -import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; /** @@ -29,10 +34,38 @@ import com.google.common.collect.Iterables; * */ public class Closeables { + + private static final Logger LOGGER = LoggerFactory.getLogger(Closeables.class); + /** Not constructed */ private Closeables() { } /** + * Close a {@code Closeable}, returning an {@code IOException} if it occurs while closing + * instead of throwing it. This is nearly a clone of the Guava Closeables.closeQuietly method + * which has long since been removed from Guava. + * + * Use of this method should be avoided -- quietly swallowing IOExceptions (particularly on + * Closeables that are being written to) is a code smell. Use of the equivalent method in + * Guava was done for this reason. + * + * @param closeable the Closeable to be closed, can be null + * @return the IOException if one was thrown, otherwise {@code null} + */ + public static IOException closeQuietly(@Nullable Closeable closeable) { + if (closeable == null) { + return null; + } + try { + closeable.close(); + return null; + } catch (IOException e) { + LOGGER.error("Error closing " + closeable, e); + return e; + } + } + + /** * Allows you to close as many of the {@link Closeable}s as possible. * * If any of the close's fail with an IOException, those exception(s) will @@ -48,11 +81,10 @@ public class Closeables { LinkedList<IOException> exceptions = null; for (Closeable closeable : iterable) { - try { - closeable.close(); - } catch (IOException x) { + IOException ioe = closeQuietly(closeable); + if (ioe != null) { if (exceptions == null) exceptions = new LinkedList<IOException>(); - exceptions.add(x); + exceptions.add(ioe); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java index efcbef6..f9c929d 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Lists; @@ -140,7 +139,7 @@ public final class PhoenixSink extends AbstractSink implements Configurable { Channel channel = getChannel(); Transaction transaction = null; List<Event> events = Lists.newArrayListWithExpectedSize(this.batchSize); - Stopwatch watch = new Stopwatch().start(); + long startTime = System.nanoTime(); try { transaction = channel.getTransaction(); transaction.begin(); @@ -194,7 +193,9 @@ public final class PhoenixSink extends AbstractSink implements Configurable { throw new EventDeliveryException("Failed to persist message", e); } finally { - logger.error(String.format("Time taken to process [%s] events was [%s] seconds",events.size(),watch.stop().elapsedTime(TimeUnit.SECONDS))); + logger.info(String.format("Time taken to process [%s] events was [%s] seconds", + events.size(), + TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS))); if( transaction != null ) { transaction.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/12494b7b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7be22ca..d2d30f0 100644 --- a/pom.xml +++ b/pom.xml @@ -523,6 +523,11 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>2.0.1</version> + </dependency> + <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-jaxrs</artifactId> <version>${jackson.version}</version>