http://git-wip-us.apache.org/repos/asf/phoenix/blob/ccd04ba6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 9aa4b5d..dbbc37b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -34,6 +34,7 @@ import java.lang.ref.WeakReference; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; @@ -250,6 +252,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // setting this member variable guarded by "connectionCountLock" private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap(); private KeyValueBuilder kvBuilder; + private final int renewLeaseTaskFrequency; private final int renewLeasePoolSize; private final int renewLeaseThreshold; @@ -263,7 +266,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); } - + private final Map<Feature, FeatureSupported> featureMap = ImmutableMap.<Feature, FeatureSupported>of( Feature.LOCAL_INDEX, new FeatureSupported(){ @Override @@ -278,12 +281,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement int hbaseVersion = services.getLowestClusterHBaseVersion(); return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION; } - }); - + }); + private PMetaData newEmptyMetaData() { return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps())); } - + /** * Construct a ConnectionQueryServicesImpl that represents a connection to an HBase * cluster. @@ -341,7 +344,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public TransactionSystemClient getTransactionSystemClient() { return txServiceClient; } - + private void initTxServiceClient() { String zkQuorumServersString = this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); if (zkQuorumServersString==null) { @@ -349,13 +352,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - // Create instance of the tephra zookeeper client + // Create instance of the tephra zookeeper client ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create()); - + ZKClientService zkClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) - ) + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) + ) ); zkClientService.startAndWait(); ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); @@ -363,7 +366,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement config, zkDiscoveryService); this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); } - + private void openConnection() throws SQLException { try { boolean transactionsEnabled = props.getBoolean( @@ -376,7 +379,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); } catch (IOException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) - .setRootCause(e).build().buildException(); + .setRootCause(e).build().buildException(); } if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above? throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException(); @@ -390,7 +393,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (org.apache.hadoop.hbase.TableNotFoundException e) { throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName)); } catch (IOException e) { - throw new SQLException(e); + throw new SQLException(e); } } @@ -400,11 +403,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { return htable.getTableDescriptor(); } catch (IOException e) { - if(e instanceof org.apache.hadoop.hbase.TableNotFoundException || - e.getCause() instanceof org.apache.hadoop.hbase.TableNotFoundException) { - byte[][] schemaAndTableName = new byte[2][]; - SchemaUtil.getVarChars(tableName, schemaAndTableName); - throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1])); + if(e instanceof org.apache.hadoop.hbase.TableNotFoundException + || e.getCause() instanceof org.apache.hadoop.hbase.TableNotFoundException) { + byte[][] schemaAndTableName = new byte[2][]; + SchemaUtil.getVarChars(tableName, schemaAndTableName); + throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1])); } throw new RuntimeException(e); } finally { @@ -517,10 +520,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement List<HRegionLocation> locations = Lists.newArrayList(); byte[] currentKey = HConstants.EMPTY_START_ROW; do { - HRegionLocation regionLocation = connection.getRegionLocation( - TableName.valueOf(tableName), currentKey, reload); - locations.add(regionLocation); - currentKey = regionLocation.getRegionInfo().getEndKey(); + HRegionLocation regionLocation = connection.getRegionLocation( + TableName.valueOf(tableName), currentKey, reload); + locations.add(regionLocation); + currentKey = regionLocation.getRegionInfo().getEndKey(); } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)); return locations; } catch (org.apache.hadoop.hbase.TableNotFoundException e) { @@ -532,7 +535,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement continue; } throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL) - .setRootCause(e).build().buildException(); + .setRootCause(e).build().buildException(); } } } @@ -553,7 +556,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement latestMetaDataLock.notifyAll(); } } - + @Override public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { synchronized (latestMetaDataLock) { @@ -610,22 +613,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // restore the interrupt status Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) - .setRootCause(e).build().buildException(); // FIXME + .setRootCause(e).build().buildException(); // FIXME } } latestMetaData = metaData; latestMetaDataLock.notifyAll(); return metaData; } - } + } - @Override + @Override public void addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final boolean isTransactional, final long updateCacheFrequency, final boolean isNamespaceMapped, final long resolvedTime) throws SQLException { - metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { + metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { @Override public void mutate(PMetaData metaData) throws SQLException { try { @@ -637,7 +640,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } }); - } + } @Override public void removeTable(PName tenantId, final String tableName, String parentTableName, long tableTimeStamp) throws SQLException { @@ -678,7 +681,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (tableType != PTableType.VIEW) { if(props.get(QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB) != null){ columnDesc.setKeepDeletedCells(props.getBoolean( - QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB, QueryServicesOptions.DEFAULT_KEEP_DELETED_CELLS)); + QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB, QueryServicesOptions.DEFAULT_KEEP_DELETED_CELLS)); } columnDesc.setDataBlockEncoding(SchemaUtil.DEFAULT_DATA_BLOCK_ENCODING); for (Entry<String,Object> entry : family.getSecond().entrySet()) { @@ -689,7 +692,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return columnDesc; } - + // Workaround HBASE-14737 private static void setHColumnDescriptorValue(HColumnDescriptor columnDesc, String key, Object value) { if (HConstants.VERSIONS.equals(key)) { @@ -712,7 +715,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return Integer.parseInt(stringValue); } - + private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Map<String,Object> props) throws SQLException { for (Entry<String, Object> entry : props.entrySet()) { String propName = entry.getKey(); @@ -720,14 +723,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement setHColumnDescriptorValue(hcd, propName, value); } } - + private HTableDescriptor generateTableDescriptor(byte[] tableName, HTableDescriptor existingDesc, PTableType tableType, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped) throws SQLException { String defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME); HTableDescriptor tableDescriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc) - : new HTableDescriptor( - SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes()); + : new HTableDescriptor( + SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes()); for (Entry<String,Object> entry : tableProps.entrySet()) { String key = entry.getKey(); if (!TableProperty.isPhoenixTableProperty(key)) { @@ -769,9 +772,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } addCoprocessors(tableName, tableDescriptor, tableType, tableProps); + + // PHOENIX-3072: Set index priority if this is a system table or index table + if (tableType == PTableType.SYSTEM) { + tableDescriptor.setValue(QueryConstants.PRIORITY, + String.valueOf(PhoenixRpcSchedulerFactory.getMetadataPriority(config))); + } else if (tableType == PTableType.INDEX // Global, mutable index + && !isLocalIndexTable(tableDescriptor.getFamiliesKeys()) + && !Boolean.TRUE.equals(tableProps.get(PhoenixDatabaseMetaData.IMMUTABLE_ROWS))) { + tableDescriptor.setValue(QueryConstants.PRIORITY, + String.valueOf(PhoenixRpcSchedulerFactory.getIndexPriority(config))); + } return tableDescriptor; } + private boolean isLocalIndexTable(Collection<byte[]> families) { + // no easier way to know local index table? + for (byte[] family: families) { + if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + return true; + } + } + return false; + } + + private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException { // The phoenix jar must be available on HBase classpath int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); @@ -788,7 +813,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } - boolean isTransactional = + boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE // TODO: better encapsulation for this @@ -828,7 +853,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) { descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(), - null, priority, null); + null, priority, null); break; } } @@ -850,7 +875,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null); } } - + if (isTransactional) { if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(), null, priority - 10, null); @@ -859,7 +884,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // If exception on alter table to transition back to non transactional if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName()); - } + } } } catch (IOException e) { throw ServerUtil.parseServerException(e); @@ -977,7 +1002,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement SQLException sqlE = null; HTableDescriptor existingDesc = null; boolean isMetaTable = SchemaUtil.isMetaTable(tableName); - byte[] physicalTable = SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes(); + byte[] physicalTable = SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes(); boolean tableExist = true; try (HBaseAdmin admin = getAdmin()) { final String quorum = ZKConfig.getZKQuorumServersString(config); @@ -1001,7 +1026,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!tableExist) { if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals( - PBoolean.INSTANCE.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { + PBoolean.INSTANCE.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); } // Remove the splitPolicy attribute to prevent HBASE-12570 @@ -1043,12 +1068,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { for(Pair<byte[],Map<String,Object>> family: families) { if ((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || !newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals( - IndexRegionSplitPolicy.class.getName())) + IndexRegionSplitPolicy.class.getName())) && Bytes.toString(family.getFirst()).startsWith( - QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { - newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); - break; - } + QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); + break; + } } } @@ -1097,18 +1122,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private void modifyTable(byte[] tableName, HTableDescriptor newDesc, boolean shouldPoll) throws IOException, InterruptedException, TimeoutException, SQLException { - try (HBaseAdmin admin = getAdmin()) { - if (!allowOnlineTableSchemaUpdate()) { - admin.disableTable(tableName); - admin.modifyTable(tableName, newDesc); - admin.enableTable(tableName); - } else { - admin.modifyTable(tableName, newDesc); - if (shouldPoll) { - pollForUpdatedTableDescriptor(admin, newDesc, tableName); - } - } - } + try (HBaseAdmin admin = getAdmin()) { + if (!allowOnlineTableSchemaUpdate()) { + admin.disableTable(tableName); + admin.modifyTable(tableName, newDesc); + admin.enableTable(tableName); + } else { + admin.modifyTable(tableName, newDesc); + if (shouldPoll) { + pollForUpdatedTableDescriptor(admin, newDesc, tableName); + } + } + } } private static boolean hasIndexWALCodec(Long serverVersion) { @@ -1181,18 +1206,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (isTableNamespaceMappingEnabled != SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE, getProps())) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES) - .setMessage( - "Ensure that config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED - + " is consitent on client and server.") - .build().buildException(); } + .setMessage( + "Ensure that config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED + + " is consitent on client and server.") + .build().buildException(); } lowestClusterHBaseVersion = minHBaseVersion; } catch (SQLException e) { throw e; } catch (Throwable t) { // This is the case if the "phoenix.jar" is not on the classpath of HBase on the region server throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCOMPATIBLE_CLIENT_SERVER_JAR).setRootCause(t) - .setMessage("Ensure that " + QueryConstants.DEFAULT_COPROCESS_PATH + " is put on the classpath of HBase in every region server: " + t.getMessage()) - .build().buildException(); + .setMessage("Ensure that " + QueryConstants.DEFAULT_COPROCESS_PATH + " is put on the classpath of HBase in every region server: " + t.getMessage()) + .build().buildException(); } finally { if (ht != null) { try { @@ -1216,12 +1241,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException { return metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); } - /** - * Invoke meta data coprocessor with one retry if the key was found to not be in the regions - * (due to a table split) - */ - private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, - Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException { + + /** + * Invoke meta data coprocessor with one retry if the key was found to not be in the regions + * (due to a table split) + */ + private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, + Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException { try { boolean retried = false; @@ -1261,10 +1287,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private void ensureViewIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, long timestamp, boolean isNamespaceMapped) throws SQLException { - Long maxFileSize = (Long)tableProps.get(HTableDescriptor.MAX_FILESIZE); - if (maxFileSize == null) { - maxFileSize = this.props.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); - } byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING); @@ -1322,11 +1344,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement for(HColumnDescriptor cf : desc.getColumnFamilies()) { if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { columnFamiles.add(cf.getNameAsString()); - } + } } for(String cf: columnFamiles) { admin.deleteColumn(physicalTableName, cf); - } + } clearTableRegionCache(physicalTableName); wasDeleted = true; } @@ -1354,10 +1376,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); boolean localIndexTable = false; for(Pair<byte[], Map<String, Object>> family: families) { - if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { - localIndexTable = true; - break; - } + if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + localIndexTable = true; + break; + } } if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) { // For views this will ensure that metadata already exists @@ -1396,30 +1418,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement ensureViewIndexTableCreated( SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes(), tableProps, familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null, - MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped); + MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped); } byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { + new Batch.Call<MetaDataService, MetaDataResponse>() { @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - CreateTableRequest.Builder builder = CreateTableRequest.newBuilder(); - for (Mutation m : tableMetaData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - CreateTableRequest build = builder.build(); - instance.createTable(controller, build, rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + CreateTableRequest.Builder builder = CreateTableRequest.newBuilder(); + for (Mutation m : tableMetaData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + CreateTableRequest build = builder.build(); + instance.createTable(controller, build, rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } }); return result; } @@ -1430,26 +1452,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(); byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); return metaDataCoprocessorExec(tableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - GetTableRequest.Builder builder = GetTableRequest.newBuilder(); - builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); - builder.setSchemaName(ByteStringer.wrap(schemaBytes)); - builder.setTableName(ByteStringer.wrap(tableBytes)); - builder.setTableTimestamp(tableTimestamp); - builder.setClientTimestamp(clientTimestamp); - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.getTable(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + GetTableRequest.Builder builder = GetTableRequest.newBuilder(); + builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); + builder.setSchemaName(ByteStringer.wrap(schemaBytes)); + builder.setTableName(ByteStringer.wrap(tableBytes)); + builder.setTableTimestamp(tableTimestamp); + builder.setClientTimestamp(clientTimestamp); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.getTable(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); } - }); + return rpcCallback.get(); + } + }); } @Override @@ -1463,26 +1485,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantIdBytes, schemaBytes, tableBytes); final MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - DropTableRequest.Builder builder = DropTableRequest.newBuilder(); - for (Mutation m : tableMetaData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - builder.setTableType(tableType.getSerializedValue()); - builder.setCascade(cascade); - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.dropTable(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + DropTableRequest.Builder builder = DropTableRequest.newBuilder(); + for (Mutation m : tableMetaData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + builder.setTableType(tableType.getSerializedValue()); + builder.setCascade(cascade); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.dropTable(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); final MutationCode code = result.getMutationCode(); switch(code) { @@ -1506,7 +1528,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement default: break; } - return result; + return result; } /* @@ -1536,28 +1558,28 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes); - + final MetaDataMutationResult result = metaDataCoprocessorExec(functionKey, new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder(); - for (Mutation m : functionData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - builder.setIfExists(ifExists); - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.dropFunction(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder(); + for (Mutation m : functionData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + builder.setIfExists(ifExists); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.dropFunction(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); return result; } @@ -1602,7 +1624,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String name = Bytes .toString(SchemaUtil.getParentTableNameFromIndexTable(physicalIndexTableName, MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) - .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR); + .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR); PTable table = getTable(tenantId, name, timestamp); ensureViewIndexTableCreated(table, timestamp, isNamespaceMapped); } @@ -1614,7 +1636,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throwConnectionClosedIfNullMetaData(); table = metadata.getTableRef(new PTableKey(tenantId, fullTableName)).getTable(); if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be - // the case + // the case throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); } } catch (TableNotFoundException e) { @@ -1628,8 +1650,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return table; } - private void ensureViewIndexTableCreated(PTable table, long timestamp, boolean isNamespaceMapped) - throws SQLException { + private void ensureViewIndexTableCreated(PTable table, long timestamp, boolean isNamespaceMapped) + throws SQLException { byte[] physicalTableName = table.getPhysicalName().getBytes(); HTableDescriptor htableDesc = this.getTableDescriptor(physicalTableName); Map<String,Object> tableProps = createPropertiesMap(htableDesc.getValues()); @@ -1653,13 +1675,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement splits = SaltingUtil.getSalteByteSplitPoints(table.getBucketNum()); } + // Transfer over table values into tableProps + // TODO: encapsulate better + tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, table.isTransactional()); + tableProps.put(PhoenixDatabaseMetaData.IMMUTABLE_ROWS, table.isImmutableRows()); ensureViewIndexTableCreated(physicalTableName, tableProps, families, splits, timestamp, isNamespaceMapped); } - + @Override public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException { - List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size()); - Map<String, Object> tableProps = new HashMap<String, Object>(); + List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size()); + Map<String, Object> tableProps = new HashMap<String, Object>(); Set<HTableDescriptor> tableDescriptors = Collections.emptySet(); Set<HTableDescriptor> origTableDescriptors = Collections.emptySet(); boolean nonTxToTx = false; @@ -1680,7 +1706,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors, origTableDescriptors); } } - + boolean success = false; boolean metaDataUpdated = !tableDescriptors.isEmpty(); boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty()); @@ -1690,13 +1716,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (table.getType() == PTableType.VIEW) { boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE); - // When adding a column to a view, base physical table should only be modified when new column families are being added. + // When adding a column to a view, base physical table should only be modified when new column families are being added. modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded); } if (modifyHTable) { sendHBaseMetaData(tableDescriptors, pollingNeeded); } - + // Special case for call during drop table to ensure that the empty column family exists. // In this, case we only include the table header row, as until we add schemaBytes and tableBytes // as args to this function, we have no way of getting them in this case. @@ -1705,9 +1731,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) { return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table); } - byte[][] rowKeyMetaData = new byte[3][]; - PTableType tableType = table.getType(); - + byte[][] rowKeyMetaData = new byte[3][]; + PTableType tableType = table.getType(); + Mutation m = tableMetaData.get(0); byte[] rowKey = m.getRow(); SchemaUtil.getVarChars(rowKey, rowKeyMetaData); @@ -1715,37 +1741,37 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); - + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); result = metaDataCoprocessorExec(tableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - AddColumnRequest.Builder builder = AddColumnRequest.newBuilder(); - for (Mutation m : tableMetaData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.addColumn(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + AddColumnRequest.Builder builder = AddColumnRequest.newBuilder(); + for (Mutation m : tableMetaData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); } - }); - + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.addColumn(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); + if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success success = true; // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr) - && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) { + && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) { flushTable(table.getPhysicalName().getBytes()); } - + if (tableType == PTableType.TABLE) { // If we're changing MULTI_TENANT to true or false, create or drop the view index table if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){ @@ -1851,7 +1877,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - + private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException { SQLException sqlE = null; for (HTableDescriptor descriptor : tableDescriptors) { @@ -1880,7 +1906,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps); } - + private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException { Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size()); Map<String,Object> commonFamilyProps = new HashMap<>(); @@ -1909,7 +1935,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY) .setMessage("Column Family: " + family + ", Property: " + propName).build() - .buildException(); + .buildException(); } tableProps.put(propName, propValue); } else { @@ -1933,7 +1959,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } else { // invalid property - neither of HTableProp, HColumnProp or PhoenixTableProp - // FIXME: This isn't getting triggered as currently a property gets evaluated + // FIXME: This isn't getting triggered as currently a property gets evaluated // as HTableProp if its neither HColumnProp or PhoenixTableProp. throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY) .setMessage("Column Family: " + family + ", Property: " + propName).build() @@ -1945,7 +1971,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!colFamilyPropsMap.isEmpty()) { stmtFamiliesPropsMap.put(family, colFamilyPropsMap); } - + } } commonFamilyProps = Collections.unmodifiableMap(commonFamilyProps); @@ -1967,13 +1993,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement m.putAll(commonFamilyProps); allFamiliesProps.put(colFamily, m); } else if (isAddingPkColOnly) { - // Setting HColumnProperty for a pk column is invalid + // Setting HColumnProperty for a pk column is invalid // because it will be part of the row key and not a key value column family. - // However, if both pk cols as well as key value columns are getting added + // However, if both pk cols as well as key value columns are getting added // together, then its allowed. The above if block will make sure that we add properties // only for the kv cols and not pk cols. throw new SQLExceptionInfo.Builder(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE) - .build().buildException(); + .build().buildException(); } } } @@ -2004,8 +2030,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } // case when there is a column family being added but there are no props - // For ex - in DROP COLUMN when a new empty CF needs to be added since all - // the columns of the existing empty CF are getting dropped. Or the case + // For ex - in DROP COLUMN when a new empty CF needs to be added since all + // the columns of the existing empty CF are getting dropped. Or the case // when one is just adding a column for a column family like this: // ALTER TABLE ADD CF.COL for (String cf : colFamiliesForPColumnsToBeAdded) { @@ -2023,7 +2049,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build() .buildException(); } - + HTableDescriptor newTableDescriptor = null; HTableDescriptor origTableDescriptor = null; if (!allFamiliesProps.isEmpty() || !tableProps.isEmpty()) { @@ -2037,7 +2063,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } if (addingColumns) { - // Make sure that all the CFs of the table have the same TTL as the empty CF. + // Make sure that all the CFs of the table have the same TTL as the empty CF. setTTLForNewCFs(allFamiliesProps, table, newTableDescriptor, newTTL); } // Set TTL on all table column families, even if they're not referenced here @@ -2061,7 +2087,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (isTransactional) { defaultTxMaxVersions = newTableDescriptor.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getMaxVersions(); } else { - defaultTxMaxVersions = + defaultTxMaxVersions = this.getProps().getInt( QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB, QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL); @@ -2078,7 +2104,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - // Set Tephra's TTL property based on HBase property if we're + // Set Tephra's TTL property based on HBase property if we're // transitioning to become transactional or setting TTL on // an already transactional table. if (isOrWillBeTransactional) { @@ -2121,7 +2147,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return new Pair<>(origTableDescriptor, newTableDescriptor); } - + private void checkTransactionalVersionsValue(HColumnDescriptor colDescriptor) throws SQLException { int maxVersions = colDescriptor.getMaxVersions(); if (maxVersions <= 1) { @@ -2136,7 +2162,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable(); return existingColumnFamilies(table); } - + private HashSet<String> existingColumnFamilies(PTable table) { List<PColumnFamily> cfs = table.getColumnFamilies(); HashSet<String> cfNames = new HashSet<>(cfs.size()); @@ -2148,12 +2174,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private static int getTTL(PTable table, HTableDescriptor tableDesc, Integer newTTL) throws SQLException { // If we're setting TTL now, then use that value. Otherwise, use empty column family value - int ttl = newTTL != null ? newTTL + int ttl = newTTL != null ? newTTL : tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getTimeToLive(); return ttl; } - - private static void setTTLForNewCFs(Map<String, Map<String, Object>> familyProps, PTable table, + + private static void setTTLForNewCFs(Map<String, Map<String, Object>> familyProps, PTable table, HTableDescriptor tableDesc, Integer newTTL) throws SQLException { if (!familyProps.isEmpty()) { int ttl = getTTL(table, tableDesc, newTTL); @@ -2166,7 +2192,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - + @Override public MetaDataMutationResult dropColumn(final List<Mutation> tableMetaData, PTableType tableType) throws SQLException { byte[][] rowKeyMetadata = new byte[3][]; @@ -2176,25 +2202,25 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - DropColumnRequest.Builder builder = DropColumnRequest.newBuilder(); - for (Mutation m : tableMetaData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.dropColumn(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + DropColumnRequest.Builder builder = DropColumnRequest.newBuilder(); + for (Mutation m : tableMetaData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); } - }); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.dropColumn(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); final MutationCode code = result.getMutationCode(); switch(code) { case TABLE_ALREADY_EXISTS: @@ -2211,7 +2237,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return result; } - + /** * This closes the passed connection. */ @@ -2244,15 +2270,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return metaConnection; } - - /** + + /** * Keeping this to use for further upgrades. This method closes the oldMetaConnection. */ private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns) throws SQLException { return addColumn(oldMetaConnection, tableName, timestamp, columns, true); } - + @Override public void init(final String url, final Properties props) throws SQLException { try { @@ -2282,32 +2308,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean upgradeSystemTables = !Boolean.TRUE.equals(Boolean.valueOf(noUpgradeProp)); Properties scnProps = PropertiesUtil.deepCopy(props); scnProps.setProperty( - PhoenixRuntime.CURRENT_SCN_ATTRIB, - Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); + PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); metaConnection = new PhoenixConnection( - ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData()); + ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData()); try (HBaseAdmin admin = getAdmin()) { boolean mappedSystemCatalogExists = admin .tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true)); if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, - ConnectionQueryServicesImpl.this.getProps())) { + ConnectionQueryServicesImpl.this.getProps())) { if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) { - //check if the server is already updated and have namespace config properly set. + //check if the server is already updated and have namespace config properly set. checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES); } ensureSystemTablesUpgraded(ConnectionQueryServicesImpl.this.getProps()); } else if (mappedSystemCatalogExists) { throw new SQLExceptionInfo.Builder( - SQLExceptionCode.INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES) + SQLExceptionCode.INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES) .setMessage("Cannot initiate connection as " + SchemaUtil.getPhysicalTableName( - SYSTEM_CATALOG_NAME_BYTES, true) - + " is found but client does not have " - + IS_NAMESPACE_MAPPING_ENABLED + " enabled") - .build().buildException(); } + SYSTEM_CATALOG_NAME_BYTES, true) + + " is found but client does not have " + + IS_NAMESPACE_MAPPING_ENABLED + " enabled") + .build().buildException(); } } - try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); @@ -2344,10 +2369,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then - // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. - // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, - // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all - // the column names that have been added to SYSTEM.CATALOG since 4.0. + // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. + // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, + // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all + // the column names that have been added to SYSTEM.CATALOG since 4.0. if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName()); @@ -2370,7 +2395,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false); upgradeTo4_5_0(metaConnection); } catch (ColumnAlreadyExistsException ignored) { - /* + /* * Upgrade to 4.5 is a slightly special case. We use the fact that the column * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that * the server side upgrade has finished or is in progress. @@ -2406,18 +2431,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { // Drop old stats table so that new stats table is created - metaConnection = dropStatsTable(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, - PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, - PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName()); - metaConnection = setImmutableTableIndexesImmutable(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); - metaConnection = updateSystemCatalogTimestamp(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); + metaConnection = dropStatsTable(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, + PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, + PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName()); + metaConnection = setImmutableTableIndexesImmutable(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); + metaConnection = updateSystemCatalogTimestamp(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); clearCache(); } @@ -2452,7 +2477,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, - QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); try { String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets); metaConnection.createStatement().executeUpdate(createSequenceTable); @@ -2469,57 +2494,57 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { // If the table time stamp is before 4.1.0 then we need to add below columns // to the SYSTEM.SEQUENCE table. - String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName() + String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName() + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName() + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName() + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName(); addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); } // If the table timestamp is before 4.2.1 then run the upgrade script if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) { if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { metaConnection.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES, + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES); } nSequenceSaltBuckets = nSaltBuckets; - } else { + } else { nSequenceSaltBuckets = getSaltBuckets(e); } } } try { metaConnection.createStatement().executeUpdate( - QueryConstants.CREATE_STATS_TABLE_METADATA); + QueryConstants.CREATE_STATS_TABLE_METADATA); } catch (NewerTableAlreadyExistsException ignore) { } catch(TableAlreadyExistsException e) { if (upgradeSystemTables) { long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_STATS_NAME, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, - PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " " - + PLong.INSTANCE.getSqlTypeName()); + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, + PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " " + + PLong.INSTANCE.getSqlTypeName()); } } } try { metaConnection.createStatement().executeUpdate( - QueryConstants.CREATE_FUNCTION_METADATA); + QueryConstants.CREATE_FUNCTION_METADATA); } catch (NewerTableAlreadyExistsException e) { } catch (TableAlreadyExistsException e) { } if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, - ConnectionQueryServicesImpl.this.getProps())) { + ConnectionQueryServicesImpl.this.getProps())) { try { metaConnection.createStatement().executeUpdate("CREATE SCHEMA IF NOT EXISTS " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); @@ -2611,7 +2636,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return tableNames; } - + private String addColumn(String columnsToAddSoFar, String columns) { if (columnsToAddSoFar == null || columnsToAddSoFar.isEmpty()) { return columns; @@ -2624,12 +2649,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (isRenewingLeasesEnabled()) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("PHOENIX-SCANNER-RENEW-LEASE" + "-thread-%s").build(); + .setNameFormat("PHOENIX-SCANNER-RENEW-LEASE" + "-thread-%s").build(); renewLeaseExecutor = Executors.newScheduledThreadPool(renewLeasePoolSize, threadFactory); for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : connectionQueues) { renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 0, - renewLeaseTaskFrequency, TimeUnit.MILLISECONDS); + renewLeaseTaskFrequency, TimeUnit.MILLISECONDS); } } } @@ -2648,18 +2673,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.setAutoCommit(true); metaConnection.createStatement().execute( - "UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, IMMUTABLE_ROWS)\n" + - "SELECT A.TENANT_ID, A.TABLE_SCHEM,B.COLUMN_FAMILY,null,null,true\n" + - "FROM SYSTEM.CATALOG A JOIN SYSTEM.CATALOG B ON (\n" + - " A.TENANT_ID = B.TENANT_ID AND \n" + - " A.TABLE_SCHEM = B.TABLE_SCHEM AND\n" + - " A.TABLE_NAME = B.TABLE_NAME AND\n" + - " A.COLUMN_NAME = B.COLUMN_NAME AND\n" + - " B.LINK_TYPE = 1\n" + - ")\n" + - "WHERE A.COLUMN_FAMILY IS NULL AND\n" + - " B.COLUMN_FAMILY IS NOT NULL AND\n" + - " A.IMMUTABLE_ROWS = TRUE"); + "UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, IMMUTABLE_ROWS)\n" + + "SELECT A.TENANT_ID, A.TABLE_SCHEM,B.COLUMN_FAMILY,null,null,true\n" + + "FROM SYSTEM.CATALOG A JOIN SYSTEM.CATALOG B ON (\n" + + " A.TENANT_ID = B.TENANT_ID AND \n" + + " A.TABLE_SCHEM = B.TABLE_SCHEM AND\n" + + " A.TABLE_NAME = B.TABLE_NAME AND\n" + + " A.COLUMN_NAME = B.COLUMN_NAME AND\n" + + " B.LINK_TYPE = 1\n" + + ")\n" + + "WHERE A.COLUMN_FAMILY IS NULL AND\n" + + " B.COLUMN_FAMILY IS NOT NULL AND\n" + + " A.IMMUTABLE_ROWS = TRUE"); } catch (SQLException e) { logger.warn("exception during upgrading stats table:" + e); sqlE = e; @@ -2699,8 +2724,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.setAutoCommit(true); metaConnection.createStatement().execute( - "UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, DISABLE_WAL)\n" + - "VALUES (NULL, '" + QueryConstants.SYSTEM_SCHEMA_NAME + "','" + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE + "', NULL, NULL, FALSE)"); + "UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, DISABLE_WAL)\n" + + "VALUES (NULL, '" + QueryConstants.SYSTEM_SCHEMA_NAME + "','" + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE + "', NULL, NULL, FALSE)"); } catch (SQLException e) { logger.warn("exception during upgrading stats table:" + e); sqlE = e; @@ -2723,46 +2748,46 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } private PhoenixConnection dropStatsTable(PhoenixConnection oldMetaConnection, long timestamp) - throws SQLException, IOException { - Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo()); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); - PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props); - SQLException sqlE = null; - boolean wasCommit = metaConnection.getAutoCommit(); - try { - metaConnection.setAutoCommit(true); - metaConnection.createStatement() - .executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE " - + PhoenixDatabaseMetaData.TABLE_NAME + "='" + PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE - + "' AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" - + PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME + "'"); - } catch (SQLException e) { - logger.warn("exception during upgrading stats table:" + e); - sqlE = e; - } finally { - try { - metaConnection.setAutoCommit(wasCommit); - oldMetaConnection.close(); - } catch (SQLException e) { - if (sqlE != null) { - sqlE.setNextException(e); - } else { - sqlE = e; - } - } - if (sqlE != null) { - throw sqlE; - } - } - return metaConnection; - } - + throws SQLException, IOException { + Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo()); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); + PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props); + SQLException sqlE = null; + boolean wasCommit = metaConnection.getAutoCommit(); + try { + metaConnection.setAutoCommit(true); + metaConnection.createStatement() + .executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE " + + PhoenixDatabaseMetaData.TABLE_NAME + "='" + PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE + + "' AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + + PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME + "'"); + } catch (SQLException e) { + logger.warn("exception during upgrading stats table:" + e); + sqlE = e; + } finally { + try { + metaConnection.setAutoCommit(wasCommit); + oldMetaConnection.close(); + } catch (SQLException e) { + if (sqlE != null) { + sqlE.setNextException(e); + } else { + sqlE = e; + } + } + if (sqlE != null) { + throw sqlE; + } + } + return metaConnection; + } + private static int getSaltBuckets(TableAlreadyExistsException e) { PTable table = e.getTable(); Integer sequenceSaltBuckets = table == null ? null : table.getBucketNum(); return sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets; } - + @Override public MutationState updateData(MutationPlan plan) throws SQLException { MutationState state = plan.execute(); @@ -2792,22 +2817,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName()); try { final Map<byte[], Long> results = - htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() { - @Override - public Long call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<ClearCacheResponse> rpcCallback = - new BlockingRpcCallback<ClearCacheResponse>(); - ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder(); - builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); - instance.clearCache(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); + htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() { + @Override + public Long call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<ClearCacheResponse> rpcCallback = + new BlockingRpcCallback<ClearCacheResponse>(); + ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder(); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.clearCache(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get().getUnfreedBytes(); } - return rpcCallback.get().getUnfreedBytes(); - } - }); + }); long unfreedBytes = 0; for (Map.Entry<byte[],Long> result : results.entrySet()) { if (result.getValue() != null) { @@ -2851,7 +2876,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // restore the interrupt status Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build() - .buildException(); + .buildException(); } finally { Closeables.closeQuietly(admin); } @@ -3088,21 +3113,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearTableFromCacheResponse>() { - @Override - public ClearTableFromCacheResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<ClearTableFromCacheResponse> rpcCallback = new BlockingRpcCallback<ClearTableFromCacheResponse>(); - ClearTableFromCacheRequest.Builder builder = ClearTableFromCacheRequest.newBuilder(); - builder.setTenantId(ByteStringer.wrap(tenantId)); - builder.setTableName(ByteStringer.wrap(tableName)); - builder.setSchemaName(ByteStringer.wrap(schemaName)); -
<TRUNCATED>
