Repository: phoenix Updated Branches: refs/heads/master 88196ca9f -> 9c4d945e1
PHOENIX-1408 Don't disable table before modifying HTable metadata (Samarth Jain) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/152f98fe Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/152f98fe Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/152f98fe Branch: refs/heads/master Commit: 152f98fe71d494e9a80cb619dfd7170a85dc5ffe Parents: 88196ca Author: James Taylor <[email protected]> Authored: Tue Nov 25 17:34:45 2014 -0800 Committer: James Taylor <[email protected]> Committed: Tue Nov 25 19:40:20 2014 -0800 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AlterTableIT.java | 19 +- .../query/ConnectionQueryServicesImpl.java | 190 ++++++++++++++++--- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 28 ++- 4 files changed, 212 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/152f98fe/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java index f711bd4..2943fe6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java @@ -911,4 +911,21 @@ public class AlterTableIT extends BaseHBaseManagedTimeIT { ddl = "ALTER TABLE T ADD STRING_ARRAY1 VARCHAR[]"; conn1.createStatement().execute(ddl); conn1.close(); - }} + } + + @Test + public void testAddColumnForNewColumnFamily() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String ddl = "CREATE TABLE T (\n" + +"ID1 VARCHAR(15) NOT NULL,\n" + +"ID2 VARCHAR(15) NOT NULL,\n" + +"CREATED_DATE DATE,\n" + +"CREATION_TIME BIGINT,\n" + +"LAST_USED DATE,\n" + +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8"; + Connection conn1 = DriverManager.getConnection(getUrl(), props); + conn1.createStatement().execute(ddl); + ddl = "ALTER TABLE T ADD CF.STRING VARCHAR"; + conn1.createStatement().execute(ddl); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/152f98fe/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 7036909..a7444bb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.GuardedBy; @@ -141,6 +142,7 @@ import org.apache.phoenix.util.UpgradeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -476,6 +478,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } latestMetaDataLock.wait(waitTime); } catch (InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) .setRootCause(e).build().buildException(); // FIXME } @@ -675,11 +679,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { admin = new HBaseAdmin(config); try { - HTableDescriptor existingDesc = admin.getTableDescriptor(tableName); - HColumnDescriptor oldDescriptor = existingDesc.getFamily(family.getFirst()); - HColumnDescriptor columnDescriptor = null; + HTableDescriptor existingTableDesc = admin.getTableDescriptor(tableName); + HColumnDescriptor oldColumnDesc = existingTableDesc.getFamily(family.getFirst()); + HColumnDescriptor newColumnDesc = null; - if (oldDescriptor == null) { + if (oldColumnDesc == null) { if (tableType == PTableType.VIEW) { String fullTableName = Bytes.toString(tableName); throw new ReadOnlyTableException( @@ -688,30 +692,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement SchemaUtil.getTableNameFromFullName(fullTableName), Bytes.toString(family.getFirst())); } - columnDescriptor = generateColumnFamilyDescriptor(family, tableType ); + newColumnDesc = generateColumnFamilyDescriptor(family, tableType ); } else { - columnDescriptor = new HColumnDescriptor(oldDescriptor); + newColumnDesc = new HColumnDescriptor(oldColumnDesc); // Don't attempt to make any metadata changes for a VIEW if (tableType == PTableType.VIEW) { return; } - modifyColumnFamilyDescriptor(columnDescriptor, family); + modifyColumnFamilyDescriptor(newColumnDesc, family); } - if (columnDescriptor.equals(oldDescriptor)) { + if (newColumnDesc.equals(oldColumnDesc)) { // Table already has family and it's the same. return; } - admin.disableTable(tableName); - if (oldDescriptor == null) { - admin.addColumn(tableName, columnDescriptor); - } else { - admin.modifyColumn(tableName, columnDescriptor); - } - admin.enableTable(tableName); + addOrModifyColumnDescriptor(tableName, admin, oldColumnDesc, newColumnDesc); } catch (org.apache.hadoop.hbase.TableNotFoundException e) { sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED).setRootCause(e).build().buildException(); - } + } catch (InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); + sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + } catch (TimeoutException e) { + sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException(); + } } catch (IOException e) { sqlE = ServerUtil.parseServerException(e); } finally { @@ -733,6 +737,117 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + private void addOrModifyColumnDescriptor(byte[] tableName, HBaseAdmin admin, HColumnDescriptor oldColumnDesc, + HColumnDescriptor newColumnDesc) throws IOException, org.apache.hadoop.hbase.TableNotFoundException, + InterruptedException, TimeoutException { + boolean isOnlineSchemaUpgradeEnabled = ConnectionQueryServicesImpl.this.props.getBoolean( + QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, + QueryServicesOptions.DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE); + if (!isOnlineSchemaUpgradeEnabled) { + admin.disableTable(tableName); + if (oldColumnDesc == null) { + admin.addColumn(tableName, newColumnDesc); + } else { + admin.modifyColumn(tableName, newColumnDesc); + } + } else { + if (oldColumnDesc == null) { + admin.addColumn(tableName, newColumnDesc); + } else { + admin.modifyColumn(tableName, newColumnDesc); + } + pollForUpdatedColumnDescriptor(admin, tableName, newColumnDesc); + } + } + + private static interface RetriableOperation { + boolean checkForCompletion() throws TimeoutException, org.apache.hadoop.hbase.TableNotFoundException, IOException; + String getOperatioName(); + } + + private void pollForUpdatedTableDescriptor(final HBaseAdmin admin, final HTableDescriptor newTableDescriptor, + final byte[] tableName) throws InterruptedException, TimeoutException { + checkAndRetry(new RetriableOperation() { + + @Override + public String getOperatioName() { + return "UpdateOrNewTableDescriptor"; + } + + @Override + public boolean checkForCompletion() throws TimeoutException, + org.apache.hadoop.hbase.TableNotFoundException, IOException { + HTableDescriptor tableDesc = admin.getTableDescriptor(tableName); + return newTableDescriptor.equals(tableDesc); + } + }); + } + + private void pollForUpdatedColumnDescriptor(final HBaseAdmin admin, final byte[] tableName, + final HColumnDescriptor columnFamilyDesc) throws InterruptedException, TimeoutException { + checkAndRetry(new RetriableOperation() { + + @Override + public String getOperatioName() { + return "UpdateOrNewColumnDescriptor"; + } + + @Override + public boolean checkForCompletion() throws TimeoutException, + org.apache.hadoop.hbase.TableNotFoundException, IOException { + HTableDescriptor newTableDesc = admin.getTableDescriptor(tableName); + return newTableDesc.getFamilies().contains(columnFamilyDesc); + } + }); + } + + private void checkAndRetry(RetriableOperation op) throws InterruptedException, TimeoutException { + int maxRetries = ConnectionQueryServicesImpl.this.props.getInt( + QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, + QueryServicesOptions.DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK); + long sleepInterval = ConnectionQueryServicesImpl.this.props + .getLong(QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK, + QueryServicesOptions.DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK); + boolean success = false; + int numTries = 1; + Stopwatch watch = new Stopwatch(); + watch.start(); + do { + try { + success = op.checkForCompletion(); + } catch (Exception ex) { + // If we encounter any exception on the first or last try, propagate the exception and fail. + // Else, we swallow the exception and retry till we reach maxRetries. + if (numTries == 1 || numTries == maxRetries) { + watch.stop(); + TimeoutException toThrow = new TimeoutException("Operation " + op.getOperatioName() + + " didn't complete because of exception. Time elapsed: " + watch.elapsedMillis()); + toThrow.initCause(ex); + throw toThrow; + } + } + numTries++; + Thread.sleep(sleepInterval); + } while (numTries < maxRetries && !success); + + watch.stop(); + + if (!success) { + throw new TimeoutException("Operation " + op.getOperatioName() + " didn't complete within " + + watch.elapsedMillis() + " ms " + + (numTries > 1 ? ("after retrying " + numTries + (numTries > 1 ? "times." : "time.")) : "")); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Operation " + + op.getOperatioName() + + " completed within " + + watch.elapsedMillis() + + "ms " + + (numTries > 1 ? ("after retrying " + numTries + (numTries > 1 ? "times." : "time.")) : "")); + } + } + } + /** * * @param tableName @@ -794,16 +909,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return existingDesc; } - // TODO: Take advantage of online schema change ability by setting "hbase.online.schema.update.enable" to true - admin.disableTable(tableName); - admin.modifyTable(tableName, newDesc); - admin.enableTable(tableName); - + modifyTable(tableName, admin, newDesc); return newDesc; } } catch (IOException e) { sqlE = ServerUtil.parseServerException(e); + } catch (InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); + sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + } catch (TimeoutException e) { + sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException(); } finally { try { if (admin != null) { @@ -823,6 +940,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return null; // will never make it here } + + private void modifyTable(byte[] tableName, HBaseAdmin admin, HTableDescriptor newDesc) throws IOException, + org.apache.hadoop.hbase.TableNotFoundException, InterruptedException, TimeoutException { + boolean isOnlineSchemaUpgradeEnabled = ConnectionQueryServicesImpl.this.props.getBoolean( + QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, + QueryServicesOptions.DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE); + if (!isOnlineSchemaUpgradeEnabled) { + admin.disableTable(tableName); + admin.modifyTable(tableName, newDesc); + admin.enableTable(tableName); + } else { + admin.modifyTable(tableName, newDesc); + pollForUpdatedTableDescriptor(admin, newDesc, tableName); + } + } private static boolean isInvalidMutableIndexConfig(Long serverVersion) { if (serverVersion == null) { @@ -1671,6 +1803,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (IOException e) { throw new PhoenixIOException(e); } catch (InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build() .buildException(); } finally { @@ -1858,9 +1992,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement SQLException sqlE = null; try { resultObjects= hTable.batch(incrementBatch); - } catch (IOException e){ + } catch (IOException e) { sqlE = ServerUtil.parseServerException(e); - } catch (InterruptedException e){ + } catch (InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) .setRootCause(e).build().buildException(); // FIXME ? } finally { @@ -1980,6 +2116,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (IOException e){ sqlE = ServerUtil.parseServerException(e); } catch (InterruptedException e){ + // restore the interrupt status + Thread.currentThread().interrupt(); sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) .setRootCause(e).build().buildException(); // FIXME ? } finally { @@ -2027,9 +2165,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement SQLException sqlE = null; try { hTable.batch(mutations); - } catch (IOException e){ + } catch (IOException e) { sqlE = ServerUtil.parseServerException(e); - } catch (InterruptedException e){ + } catch (InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) .setRootCause(e).build().buildException(); // FIXME ? } finally { http://git-wip-us.apache.org/repos/asf/phoenix/blob/152f98fe/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index ab1a8e5..eb72a83 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -141,6 +141,9 @@ public interface QueryServices extends SQLCloseable { public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; public static final String EXPLAIN_CHUNK_COUNT_ATTRIB = "phoenix.explain.displayChunkCount"; + public static final String ALLOW_ONLINE_TABLE_SCHEMA_UPDATE = "hbase.online.schema.update.enable"; + public static final String NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.retries"; + public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/152f98fe/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 460b199..7cfa3aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -17,9 +17,11 @@ */ package org.apache.phoenix.query; +import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; +import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB; @@ -40,6 +42,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATT import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB; @@ -167,7 +170,10 @@ public class QueryServicesOptions { */ public static final int DEFAULT_COPROCESSOR_PRIORITY = Coprocessor.PRIORITY_SYSTEM/2 + Coprocessor.PRIORITY_USER/2; // Divide individually to prevent any overflow public static final boolean DEFAULT_EXPLAIN_CHUNK_COUNT = true; - + public static final boolean DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE = true; + public static final int DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK = 10; + public static final long DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK = 5 * 1000; // 5 seconds. + private final Configuration config; private QueryServicesOptions(Configuration config) { @@ -215,6 +221,9 @@ public class QueryServicesOptions { .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES) .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE) .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE) + .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE) + .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK) + .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK); ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -413,7 +422,7 @@ public class QueryServicesOptions { public int getSpillableGroupByNumSpillFiles() { return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES); } - + public QueryServicesOptions setMaxServerCacheTTLMs(int ttl) { return set(MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, ttl); } @@ -468,4 +477,19 @@ public class QueryServicesOptions { return this; } + public QueryServicesOptions setAllowOnlineSchemaUpdate(boolean allow) { + config.setBoolean(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, allow); + return this; + } + + public QueryServicesOptions setNumRetriesForSchemaChangeCheck(int numRetries) { + config.setInt(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, numRetries); + return this; + } + + public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) { + config.setLong(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, delayInMillis); + return this; + } + }
