Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 4beb182db -> 6bcee5776
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 1fb8221..7e4f1a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -42,11 +41,13 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -73,7 +74,6 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PLong; @@ -99,7 +99,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; - private boolean blockWriteRebuildIndex = false; private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>(); @Override @@ -108,13 +107,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver { executor.shutdownNow(); GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll(); } - + @Override public void start(CoprocessorEnvironment env) throws IOException { - // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves + // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves // among region servers because we relies on server time of RS which is hosting // SYSTEM.CATALOG - long sleepTime = env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, + long sleepTime = env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL); try { if(sleepTime > 0) { @@ -123,12 +122,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } - enableRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, + enableRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); - rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, + rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); - blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); } @@ -171,7 +168,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { t.setDaemon(true); t.start(); - if (!enableRebuildIndex && !blockWriteRebuildIndex) { + if (!enableRebuildIndex) { LOG.info("Failure Index Rebuild is skipped by configuration."); return; } @@ -190,7 +187,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { LOG.error("BuildIndexScheduleTask cannot start!", ex); } } - + /** * Task runs periodically to build indexes whose INDEX_NEED_PARTIALLY_REBUILD is set true * @@ -205,8 +202,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) { this.env = env; - this.rebuildIndexBatchSize = env.getConfiguration() - .getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP); + this.rebuildIndexBatchSize = env.getConfiguration().getLong( + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP); this.configuredBatches = env.getConfiguration().getLong( QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, configuredBatches); } @@ -227,20 +224,20 @@ public class MetaDataRegionObserver extends BaseRegionObserver { try { Scan scan = new Scan(); SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareFilter.CompareOp.GREATER, - PLong.INSTANCE.toBytes(0L)); + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L)); filter.setFilterIfMissing(true); scan.setFilter(filter); - scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_NAME_BYTES); scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); - scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); + PhoenixDatabaseMetaData.TABLE_NAME_BYTES); scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); - PreparedStatement updateDisabledTimeStampSmt = null; + PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); + scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_STATE_BYTES); + scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); Map<PTable, List<PTable>> dataTableToIndexesMap = null; - MetaDataClient client = null; boolean hasMore = false; List<Cell> results = new ArrayList<Cell>(); scanner = this.env.getRegion().getScanner(scan); @@ -252,26 +249,17 @@ public class MetaDataRegionObserver extends BaseRegionObserver { Result r = Result.create(results); byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); - if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null - && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) { - // Don't rebuild the building index , because they are marked for aysnc + if (disabledTimeStamp == null || disabledTimeStamp.length == 0) { continue; } - // disableTimeStamp has to be a positive value - long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, - SortOrder.getDefault()); - if (disabledTimeStampVal <= 0) { - continue; - } byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); - if ((dataTable == null || dataTable.length == 0) - || (indexState == null || indexState.length == 0)) { + PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); + if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) { // data table name can't be empty continue; } @@ -289,21 +277,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } if (conn == null) { - final Properties props = new Properties(); - // Set SCN so that we don't ping server and have the upper bound set back to - // the timestamp when the failure occurred. - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE)); - - // Set timeout to max value as rebuilding may take time - props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); - props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB, - Long.toString(Long.MAX_VALUE)); - props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE)); - // don't run a second index populations upsert select - props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); - conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()) - .unwrap(PhoenixConnection.class); - client = new MetaDataClient(conn); + final Properties props = new Properties(); + // Set SCN so that we don't ping server and have the upper bound set back to + // the timestamp when the failure occurred. + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE)); + + //Set timeout to max value as rebuilding may take time + props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE)); + // don't run a second index populations upsert select + props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); + conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class); dataTableToIndexesMap = Maps.newHashMap(); } String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); @@ -315,7 +300,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { if (!dataPTable.getIndexes().contains(indexPTable)) { continue; } - + if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) { LOG.debug("Index rebuild has been skipped because not all regions of index table=" + indexPTable.getName() + " are online."); @@ -332,173 +317,198 @@ public class MetaDataRegionObserver extends BaseRegionObserver { dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild); } LOG.debug("We have found " + indexPTable.getIndexState() + " Index:" + indexPTable.getName() - + " on data table:" + dataPTable.getName() + " which was disabled at " + + " on data table:" + dataPTable.getName() + " which failed to be updated at " + indexPTable.getIndexDisableTimestamp()); indexesToPartiallyRebuild.add(indexPTable); } while (hasMore); - if (dataTableToIndexesMap != null) { - long overlapTime = env.getConfiguration().getLong( - QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); - for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) { - PTable dataPTable = entry.getKey(); - List<PTable> indexesToPartiallyRebuild = entry.getValue(); - ReadOnlyProps props = new ReadOnlyProps(env.getConfiguration().iterator()); - try (HTableInterface metaTable = env.getTable( - SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) { - long earliestDisableTimestamp = Long.MAX_VALUE; - List<IndexMaintainer> maintainers = Lists - .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); - for (PTable index : indexesToPartiallyRebuild) { - long disabledTimeStampVal = index.getIndexDisableTimestamp(); - if (disabledTimeStampVal > 0) { - if (disabledTimeStampVal < earliestDisableTimestamp) { - earliestDisableTimestamp = disabledTimeStampVal; - } - maintainers.add(index.getIndexMaintainer(dataPTable, conn)); - } - } - // No indexes are disabled, so skip this table - if (earliestDisableTimestamp == Long.MAX_VALUE) { - continue; - } - long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); - LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild - + " from timestamp=" + timeStamp); - - TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); - // TODO Need to set high timeout - PostDDLCompiler compiler = new PostDDLCompiler(conn); - MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, - HConstants.LATEST_TIMESTAMP); - Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), - maintainers); - - long scanEndTime = getTimestampForBatch(timeStamp, - batchExecutedPerTableMap.get(dataPTable.getName())); - dataTableScan.setTimeRange(timeStamp, scanEndTime); - dataTableScan.setCacheBlocks(false); - dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); - - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( - ByteUtil.EMPTY_BYTE_ARRAY); - IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, - conn); - byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - MutationState mutationState = plan.execute(); - long rowCount = mutationState.getUpdateCount(); - LOG.info(rowCount + " rows of index which are rebuild"); - for (PTable indexPTable : indexesToPartiallyRebuild) { - String indexTableFullName = SchemaUtil.getTableName( - indexPTable.getSchemaName().getString(), - indexPTable.getTableName().getString()); - if (scanEndTime == HConstants.LATEST_TIMESTAMP) { - updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, - PIndexState.ACTIVE); - batchExecutedPerTableMap.remove(dataPTable.getName()); - } else { - - updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable); - Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName()); - if (noOfBatches == null) { - noOfBatches = 0l; + if (dataTableToIndexesMap != null) { + long overlapTime = env.getConfiguration().getLong( + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); + for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) { + PTable dataPTable = entry.getKey(); + List<PTable> indexesToPartiallyRebuild = entry.getValue(); + ReadOnlyProps props = new ReadOnlyProps(env.getConfiguration().iterator()); + try (HTableInterface metaTable = env.getTable( + SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) { + long earliestDisableTimestamp = Long.MAX_VALUE; + List<IndexMaintainer> maintainers = Lists + .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); + int signOfDisableTimeStamp = 0; + for (PTable index : indexesToPartiallyRebuild) { + // We need a way of differentiating the block writes to data table case from + // the leave index active case. In either case, we need to know the time stamp + // at which writes started failing so we can rebuild from that point. If we + // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, + // then writes to the data table will be blocked (this is client side logic + // and we can't change this in a minor release). So we use the sign of the + // time stamp to differentiate. + long disabledTimeStampVal = index.getIndexDisableTimestamp(); + if (disabledTimeStampVal != 0) { + if (signOfDisableTimeStamp != 0 && signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) { + LOG.warn("Found unexpected mix of signs with INDEX_DISABLE_TIMESTAMP for " + dataPTable.getName().getString() + " with " + indexesToPartiallyRebuild); } - batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches); - // clearing cache to get the updated - // disabled timestamp - new MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(), - dataPTable.getTableName().getString()); - new MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(), - indexPTable.getTableName().getString()); - LOG.info( - "During Round-robin build: Successfully updated index disabled timestamp for " - + indexTableFullName + " to " + scanEndTime); - } - + signOfDisableTimeStamp = Long.signum(disabledTimeStampVal); + disabledTimeStampVal = Math.abs(disabledTimeStampVal); + if (disabledTimeStampVal < earliestDisableTimestamp) { + earliestDisableTimestamp = disabledTimeStampVal; + } + + maintainers.add(index.getIndexMaintainer(dataPTable, conn)); + } + } + // No indexes are disabled, so skip this table + if (earliestDisableTimestamp == Long.MAX_VALUE) { + continue; + } + long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); + LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild + + " from timestamp=" + timeStamp); + + TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); + // TODO Need to set high timeout + PostDDLCompiler compiler = new PostDDLCompiler(conn); + MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, + HConstants.LATEST_TIMESTAMP); + Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), + maintainers); + + long scanEndTime = getTimestampForBatch(timeStamp, + batchExecutedPerTableMap.get(dataPTable.getName())); + + dataTableScan.setTimeRange(timeStamp, scanEndTime); + dataTableScan.setCacheBlocks(false); + dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); + + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( + ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, + conn); + byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); + LOG.info("Starting to partially build indexes:" + indexesToPartiallyRebuild + + " on data table:" + dataPTable.getName() + " with the earliest disable timestamp:" + + earliestDisableTimestamp + " till " + + (scanEndTime == HConstants.LATEST_TIMESTAMP ? "LATEST_TIMESTAMP" : scanEndTime)); + MutationState mutationState = plan.execute(); + long rowCount = mutationState.getUpdateCount(); + if (scanEndTime == HConstants.LATEST_TIMESTAMP) { + LOG.info("Rebuild completed for all inactive/disabled indexes in data table:" + + dataPTable.getName()); } - } catch (Exception e) { // Log, but try next table's - // indexes - LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild - + ". Will try again next on next scheduled invocation.", e); - } - } - } - } catch (Throwable t) { - LOG.warn("ScheduledBuildIndexTask failed!", t); - } finally { - inProgress.decrementAndGet(); - if (scanner != null) { - try { - scanner.close(); - } catch (IOException ignored) { - LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored); - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException ignored) { - LOG.debug("ScheduledBuildIndexTask can't close connection", ignored); - } - } - } - + LOG.info(" no. of datatable rows read in rebuilding process is " + rowCount); + for (PTable indexPTable : indexesToPartiallyRebuild) { + String indexTableFullName = SchemaUtil.getTableName( + indexPTable.getSchemaName().getString(), + indexPTable.getTableName().getString()); + if (scanEndTime == HConstants.LATEST_TIMESTAMP) { + updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, + PIndexState.ACTIVE); + batchExecutedPerTableMap.remove(dataPTable.getName()); + LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding"); + } else { + // Maintain sign of INDEX_DISABLE_TIMESTAMP (see comment above) + updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime * signOfDisableTimeStamp, metaTable); + Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName()); + if (noOfBatches == null) { + noOfBatches = 0l; + } + batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches); + // clearing cache to get the updated + // disabled timestamp + new MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(), + dataPTable.getTableName().getString()); + new MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(), + indexPTable.getTableName().getString()); + LOG.info( + "During Round-robin build: Successfully updated index disabled timestamp for " + + indexTableFullName + " to " + scanEndTime); + } + + } + } catch (Exception e) { // Log, but try next table's + // indexes + LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild + + ". Will try again next on next scheduled invocation.", e); + } + } + } + } catch (Throwable t) { + LOG.warn("ScheduledBuildIndexTask failed!", t); + } finally { + inProgress.decrementAndGet(); + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) { + LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored); + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException ignored) { + LOG.debug("ScheduledBuildIndexTask can't close connection", ignored); + } + } + } } private long getTimestampForBatch(long disabledTimeStamp, Long noOfBatches) { if (disabledTimeStamp < 0 || rebuildIndexBatchSize > (HConstants.LATEST_TIMESTAMP - disabledTimeStamp)) { return HConstants.LATEST_TIMESTAMP; } long timestampForNextBatch = disabledTimeStamp + rebuildIndexBatchSize; - if (timestampForNextBatch < 0 || timestampForNextBatch > System.currentTimeMillis() - || (noOfBatches != null && noOfBatches > configuredBatches)) { - // if timestampForNextBatch cross current time , then we should - // build the complete index - timestampForNextBatch = HConstants.LATEST_TIMESTAMP; - } + if (timestampForNextBatch < 0 || timestampForNextBatch > System.currentTimeMillis() + || (noOfBatches != null && noOfBatches > configuredBatches)) { + // if timestampForNextBatch cross current time , then we should + // build the complete index + timestampForNextBatch = HConstants.LATEST_TIMESTAMP; + } return timestampForNextBatch; } - - private static void updateIndexState(PhoenixConnection conn, String indexTableName, - RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState) - throws ServiceException, Throwable { - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); - String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); - String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); - // Mimic the Put that gets generated by the client on an update of the - // index state - Put put = new Put(indexTableKey); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - newState.getSerializedBytes()); - if (newState == PIndexState.ACTIVE) { - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); - } - final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); - MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); - MutationCode code = result.getMutationCode(); - if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); } - if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder( - SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) - .setMessage(" currentState=" + oldState + ". requestedState=" + newState) - .setSchemaName(schemaName).setTableName(indexName).build().buildException(); } - } - - private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName, - RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) - throws IOException { - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); - Put put = new Put(indexTableKey); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(disabledTimestamp)); - metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.INACTIVE.getSerializedBytes(), put); - - } - } -} + + private static void updateIndexState(PhoenixConnection conn, String indexTableName, + RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState) + throws ServiceException, Throwable { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); + String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); + // Mimic the Put that gets generated by the client on an update of the + // index state + Put put = new Put(indexTableKey); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + newState.getSerializedBytes()); + if (newState == PIndexState.ACTIVE) { + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); + } + final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); + MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); + MutationCode code = result.getMutationCode(); + if (code == MutationCode.TABLE_NOT_FOUND) { + throw new TableNotFoundException(schemaName, indexName); + } + if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) + .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName) + .setTableName(indexName).build().buildException(); + } + } + + private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName, + RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) throws IOException { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + Put put = new Put(indexTableKey); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + PLong.INSTANCE.toBytes(disabledTimestamp)); + RowMutations rowMutations = new RowMutations(indexTableKey); + rowMutations.add(put); + metaTable.checkAndMutate(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0), + rowMutations); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 23b8be0..c9866c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -378,6 +378,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver RegionScanner theScanner = s; + boolean replayMutations = scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); List<Expression> selectExpressions = null; @@ -609,6 +610,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Cell firstKV = results.get(0); Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),ts); + if (replayMutations) { + delete.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + } mutations.add(delete); // force tephra to ignore this deletes delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); @@ -660,6 +664,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } for (Mutation mutation : row.toRowMutations()) { + if (replayMutations) { + mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + } mutations.add(mutation); } for (i = 0; i < selectExpressions.size(); i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 2836c45..35ba187 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -182,6 +182,8 @@ public enum SQLExceptionCode { ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP."), ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views."), INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."), + INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."), + UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of CURRENT_SCN and REPLAY_AT must be equal."), /** * HBase and Phoenix specific implementation defined sub-classes. * Column family related exceptions. http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java index a9d8311..b0d22d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java @@ -24,10 +24,16 @@ import org.apache.phoenix.jdbc.PhoenixConnection; public class CommitException extends SQLException { private static final long serialVersionUID = 2L; private final int[] uncommittedStatementIndexes; + private final long serverTimestamp; - public CommitException(Exception e, int[] uncommittedStatementIndexes) { + public CommitException(Exception e, int[] uncommittedStatementIndexes, long serverTimestamp) { super(e); this.uncommittedStatementIndexes = uncommittedStatementIndexes; + this.serverTimestamp = serverTimestamp; + } + + public long getServerTimestamp() { + return this.serverTimestamp; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index a04725c..aa6c195 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -80,6 +80,7 @@ import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; @@ -673,6 +674,14 @@ public class MutationState implements SQLCloseable { rowMutationsPertainingToIndex = rowMutations; } mutationList.addAll(rowMutations); + if (connection.isReplayMutations()) { + // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be + // future dated data row mutations that will get in the way of generating the + // correct index rows on replay. + for (Mutation mutation : rowMutations) { + mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + } + } if (mutationsPertainingToIndex != null) mutationsPertainingToIndex .addAll(rowMutationsPertainingToIndex); } @@ -1030,6 +1039,7 @@ public class MutationState implements SQLCloseable { joinMutationState(new TableRef(tableRef), valuesMap, txMutations); } } + long serverTimestamp = HConstants.LATEST_TIMESTAMP; Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet().iterator(); while (mutationsIterator.hasNext()) { Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next(); @@ -1106,6 +1116,7 @@ public class MutationState implements SQLCloseable { // Remove batches as we process them mutations.remove(origTableRef); } catch (Exception e) { + serverTimestamp = ServerUtil.parseServerTimestamp(e); SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); if (inferredE != null) { if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { @@ -1127,7 +1138,7 @@ public class MutationState implements SQLCloseable { } // Throw to client an exception that indicates the statements that // were not committed successfully. - sqlE = new CommitException(e, getUncommittedStatementIndexes()); + sqlE = new CommitException(e, getUncommittedStatementIndexes(), serverTimestamp); } finally { try { if (cache!=null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java index 831aa16..a037e92 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java @@ -61,6 +61,10 @@ public class IndexWriter implements Stoppable { this(getCommitter(env), getFailurePolicy(env), env, name); } + public IndexWriter(IndexFailurePolicy failurePolicy, RegionCoprocessorEnvironment env, String name) throws IOException { + this(getCommitter(env), failurePolicy, env, name); + } + public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException { Configuration conf = env.getConfiguration(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java new file mode 100644 index 0000000..edacd3a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.write; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.util.ServerUtil; + +import com.google.common.collect.Multimap; + +/** + * + * Implementation of IndexFailurePolicy which takes no action when an + * index cannot be updated. As with the standard flow of control, an + * exception will still be thrown back to the client. Using this failure + * policy means that the action to take upon failure is completely up + * to the client. + * + */ +public class LeaveIndexActiveFailurePolicy implements IndexFailurePolicy { + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void stop(String arg0) { + } + + @Override + public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { + } + + @Override + public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) + throws IOException { + // get timestamp of first cell + long ts = attempted.values().iterator().next().getFamilyCellMap().values().iterator().next().get(0).getTimestamp(); + throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, ts); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 5da8be8..842e881 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -31,7 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -67,7 +69,13 @@ import com.google.common.collect.Multimap; */ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class); + public static final String DISABLE_INDEX_ON_WRITE_FAILURE = "DISABLE_INDEX_ON_WRITE_FAILURE"; + public static final String REBUILD_INDEX_ON_WRITE_FAILURE = "REBUILD_INDEX_ON_WRITE_FAILURE"; + public static final String BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE = "BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE"; private RegionCoprocessorEnvironment env; + private boolean blockDataTableWritesOnFailure; + private boolean disableIndexOnFailure; + private boolean rebuildIndexOnFailure; public PhoenixIndexFailurePolicy() { super(new KillServerOnFailurePolicy()); @@ -77,6 +85,31 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { super.setup(parent, env); this.env = env; + rebuildIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); + HTableDescriptor htd = env.getRegion().getTableDesc(); + // If rebuild index is turned off globally, no need to check the table because the background thread + // won't be running in this case + if (rebuildIndexOnFailure) { + String value = htd.getValue(REBUILD_INDEX_ON_WRITE_FAILURE); + if (value != null) { + rebuildIndexOnFailure = Boolean.parseBoolean(value); + } + } + String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE); + if (value == null) { + disableIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX); + } else { + disableIndexOnFailure = Boolean.parseBoolean(value); + } + value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE); + if (value == null) { + blockDataTableWritesOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); + } else { + blockDataTableWritesOnFailure = Boolean.parseBoolean(value); + } } /** @@ -91,30 +124,34 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { */ @Override public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException { - boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); boolean throwing = true; + long timestamp = HConstants.LATEST_TIMESTAMP; try { - handleFailureWithExceptions(attempted, cause, blockWriteRebuildIndex); + timestamp = handleFailureWithExceptions(attempted, cause); throwing = false; } catch (Throwable t) { LOG.warn("handleFailure failed", t); super.handleFailure(attempted, cause); throwing = false; } finally { - if (!throwing) throw ServerUtil.createIOException(null, cause); + if (!throwing) { + throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, timestamp); + } } } - private void handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted, - Exception cause, boolean blockWriteRebuildIndex) throws Throwable { + private long handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted, + Exception cause) throws Throwable { Set<HTableInterfaceReference> refs = attempted.asMap().keySet(); Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size()); // start by looking at all the tables to which we attempted to write + long timestamp = 0; + boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure; for (HTableInterfaceReference ref : refs) { long minTimeStamp = 0; // get the minimum timestamp across all the mutations we attempted on that table + // FIXME: all cell timestamps should be the same Collection<Mutation> mutations = attempted.get(ref); if (mutations != null) { for (Mutation m : mutations) { @@ -127,6 +164,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } } } + timestamp = minTimeStamp; // If the data table has local index column families then get local indexes to disable. if (ref.getTableName().equals(env.getRegion().getTableDesc().getNameAsString()) @@ -139,37 +177,59 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } } + // Nothing to do if we're not disabling the index and not rebuilding on failure + if (!disableIndexOnFailure && !rebuildIndexOnFailure) { + return timestamp; + } + + PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.ACTIVE; // for all the index tables that we've found, try to disable them and if that fails, try to for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){ String indexTableName = tableTimeElement.getKey(); long minTimeStamp = tableTimeElement.getValue(); + // We need a way of differentiating the block writes to data table case from + // the leave index active case. In either case, we need to know the time stamp + // at which writes started failing so we can rebuild from that point. If we + // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, + // then writes to the data table will be blocked (this is client side logic + // and we can't change this in a minor release). So we use the sign of the + // time stamp to differentiate. + if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) { + minTimeStamp *= -1; + } // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor. HTableInterface systemTable = env.getTable(SchemaUtil .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); - MetaDataMutationResult result = IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp, - systemTable, blockWriteRebuildIndex); + MetaDataMutationResult result = IndexUtil.setIndexDisableTimeStamp(indexTableName, minTimeStamp, + systemTable, newState); if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) { LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations"); continue; } if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { - if (blockWriteRebuildIndex) { + if (leaveIndexActive) { LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = " + result.getMutationCode()); - throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); + // If we're not disabling the index, then we don't want to throw as throwing + // will lead to the RS being shutdown. + if (blockDataTableWritesOnFailure) { + throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); + } } else { LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead."); throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); } } - if (blockWriteRebuildIndex) + if (leaveIndexActive) LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.", cause); else LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", cause); } + // Return the cell time stamp (note they should all be the same) + return timestamp; } private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref, http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index b794ddb..2599f59 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -66,6 +66,7 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; @@ -111,7 +112,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { codec.initialize(env); // setup the actual index writer - this.writer = new IndexWriter(env, serverName + "-tx-index-writer"); + // For transactional tables, we keep the index active upon a write failure + // since we have the all versus none behavior for transactions. + this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), env, serverName + "-tx-index-writer"); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index b489583..5af418d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -143,8 +143,9 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final int mutateBatchSize; private final long mutateBatchSizeBytes; private final Long scn; + private final boolean replayMutations; private MutationState mutationState; - private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); + private List<PhoenixStatement> statements = new ArrayList<>(); private boolean isAutoFlush = false; private boolean isAutoCommit = false; private PMetaData metaData; @@ -176,7 +177,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade, isRunningUpgrade); + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade, isRunningUpgrade, connection.replayMutations); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -188,7 +189,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(PhoenixConnection connection, MutationState mutationState) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); } public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { @@ -196,7 +197,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -204,14 +205,14 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { - this(services, url, info, metaData, null, false, false); + this(services, url, info, metaData, null, false, false, false); } public PhoenixConnection(PhoenixConnection connection, ConnectionQueryServices services, Properties info) throws SQLException { - this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); } - public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException { + private PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, boolean replayMutations) throws SQLException { GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment(); this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; @@ -238,7 +239,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea Long scnParam = JDBCUtil.getCurrentSCN(url, this.info); checkScn(scnParam); - this.scn = scnParam; + Long replayAtParam = JDBCUtil.getReplayAt(url, this.info); + checkReplayAt(replayAtParam); + checkScnAndReplayAtEquality(scnParam,replayAtParam); + + this.scn = scnParam != null ? scnParam : replayAtParam; + this.replayMutations = replayMutations || replayAtParam != null; this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED) && this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ; this.isAutoCommit = JDBCUtil.getAutoCommit( @@ -309,6 +315,18 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } } + private static void checkReplayAt(Long replayAtParam) throws SQLException { + if (replayAtParam != null && replayAtParam < 0) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_REPLAY_AT).build().buildException(); + } + } + + private static void checkScnAndReplayAtEquality(Long scnParam, Long replayAt) throws SQLException { + if (scnParam != null && replayAt != null && !scnParam.equals(replayAt)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNEQUAL_SCN_AND_REPLAY_AT).build().buildException(); + } + } + private static Properties filterKnownNonProperties(Properties info) { Properties prunedProperties = info; for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) { @@ -438,6 +456,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return scn; } + public boolean isReplayMutations() { + return replayMutations; + } + public int getMutateBatchSize() { return mutateBatchSize; } @@ -487,7 +509,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } private void closeStatements() throws SQLException { - List<SQLCloseable> statements = this.statements; + List<? extends PhoenixStatement> statements = this.statements; // create new list to prevent close of statements // from modifying this list. this.statements = Lists.newArrayList(); @@ -563,6 +585,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea throw new SQLFeatureNotSupportedException(); } + public List<PhoenixStatement> getStatements() { + return statements; + } + @Override public Statement createStatement() throws SQLException { PhoenixStatement statement = new PhoenixStatement(this); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index cb649d1..cc207d9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -225,8 +225,16 @@ public class IndexTool extends Configured implements Tool { if (index.getIndexState().equals(PIndexState.BUILDING)) { disableIndexes.add(index.getTableName().getString()); disabledPIndexes.add(index); - if (minDisableTimestamp > index.getIndexDisableTimestamp()) { - minDisableTimestamp = index.getIndexDisableTimestamp(); + // We need a way of differentiating the block writes to data table case from + // the leave index active case. In either case, we need to know the time stamp + // at which writes started failing so we can rebuild from that point. If we + // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, + // then writes to the data table will be blocked (this is client side logic + // and we can't change this in a minor release). So we use the sign of the + // time stamp to differentiate. + long indexDisableTimestamp = Math.abs(index.getIndexDisableTimestamp()); + if (minDisableTimestamp > indexDisableTimestamp) { + minDisableTimestamp = indexDisableTimestamp; indexWithMinDisableTimestamp = index; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 16db802..0da67ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -784,6 +784,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HTableDescriptor tableDescriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc) : new HTableDescriptor( SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes()); + // By default, do not automatically rebuild/catch up an index on a write failure for (Entry<String,Object> entry : tableProps.entrySet()) { String key = entry.getKey(); if (!TableProperty.isPhoenixTableProperty(key)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 2627207..0c3bb85 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -137,8 +137,9 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable"; - // A master switch if to block writes when index build failed + // Block writes to data table when index write fails public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write"; + public static final String INDEX_FAILURE_DISABLE_INDEX = "phoenix.index.failure.disable.index"; // Index will be partially re-built from index disable time stamp - following overlap time public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB = http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index eef964f..b9c01f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -167,7 +167,8 @@ public class QueryServicesOptions { public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000; public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; - public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs + public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = false; + public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 60000; // 60 secs public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 8b79867..042ab7f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -159,6 +159,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -1877,6 +1878,11 @@ public class MetaDataClient { if (tableType == PTableType.TABLE) { Boolean isAppendOnlySchemaProp = (Boolean) TableProperty.APPEND_ONLY_SCHEMA.getValue(tableProps); isAppendOnlySchema = isAppendOnlySchemaProp!=null ? isAppendOnlySchemaProp : false; + + // By default, do not rebuild indexes on write failure + if (tableProps.get(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE) == null) { + tableProps.put(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE, Boolean.FALSE); + } } // Can't set any of these on views or shared indexes on views http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 8d48204..e473198 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -714,17 +714,13 @@ public class IndexUtil { HConstants.NO_NONCE, HConstants.NO_NONCE); } - public static MetaDataMutationResult disableIndexWithTimestamp(String indexTableName, long minTimeStamp, - HTableInterface metaTable, boolean blockWriteRebuildIndex) throws ServiceException, Throwable { + public static MetaDataMutationResult setIndexDisableTimeStamp(String indexTableName, long minTimeStamp, + HTableInterface metaTable, PIndexState newState) throws ServiceException, Throwable { byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); // Mimic the Put that gets generated by the client on an update of the index state Put put = new Put(indexTableKey); - if (blockWriteRebuildIndex) - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.ACTIVE.getSerializedBytes()); - else - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.DISABLE.getSerializedBytes()); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + newState.getSerializedBytes()); put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(minTimeStamp)); put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index c081904..d4cfa34 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -126,6 +126,11 @@ public class JDBCUtil { return (scnStr == null ? null : Long.parseLong(scnStr)); } + public static Long getReplayAt(String url, Properties info) throws SQLException { + String scnStr = findProperty(url, info, PhoenixRuntime.REPLAY_AT_ATTRIB); + return (scnStr == null ? null : Long.parseLong(scnStr)); + } + @Deprecated // use getMutateBatchSizeBytes public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException { String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 6af881b..0a1fd79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -124,6 +124,20 @@ public class PhoenixRuntime { public static final String CURRENT_SCN_ATTRIB = "CurrentSCN"; /** + * Use this connection property to set the long time stamp value at + * which to replay DML statements after a write failure. The time + * stamp value must match the value returned by + * {@link org.apache.phoenix.execute.CommitException#getServerTimestamp()} + * when the exception occurred. Used in conjunction with the + * {@link org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy} + * index write failure policy to provide a means of the client replaying + * updates to ensure that secondary indexes are correctly caught up + * with any data updates when a write failure occurs. The updates + * should be replayed in ascending time stamp order. + */ + public static final String REPLAY_AT_ATTRIB = "ReplayAt"; + + /** * Use this connection property to help with fairness of resource allocation * for the client and server. The value of the attribute determines the * bucket used to rollup resource usage for a particular tenant/organization. Each tenant http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index aee1c2e..c5adfa4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -25,6 +25,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; @@ -44,6 +45,8 @@ public class ServerUtil { private static final String FORMAT = "ERROR %d (%s): %s"; private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) \\((\\w+)\\): (.*)"); + private static final Pattern PATTERN_FOR_TS = Pattern.compile(",serverTimestamp=(\\d+),"); + private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,"; private static final Map<Class<? extends Exception>, SQLExceptionCode> errorcodeMap = new HashMap<Class<? extends Exception>, SQLExceptionCode>(); static { @@ -181,4 +184,38 @@ public class ServerUtil { } return getTableFromSingletonPool(env, tableName); } + + public static long parseServerTimestamp(Throwable t) { + while (t.getCause() != null) { + t = t.getCause(); + } + return parseTimestampFromRemoteException(t); + } + + private static long parseTimestampFromRemoteException(Throwable t) { + String message = t.getLocalizedMessage(); + if (message != null) { + // If the message matches the standard pattern, recover the SQLException and throw it. + Matcher matcher = PATTERN_FOR_TS.matcher(t.getLocalizedMessage()); + if (matcher.find()) { + String tsString = matcher.group(1); + if (tsString != null) { + return Long.parseLong(tsString); + } + } + } + return HConstants.LATEST_TIMESTAMP; + } + + public static DoNotRetryIOException wrapInDoNotRetryIOException(String msg, Throwable t, long timestamp) { + if (msg == null) { + msg = ""; + } + if (t instanceof SQLException) { + msg = constructSQLErrorMessage((SQLException) t, msg); + } + msg += String.format(FORMAT_FOR_TIMESTAMP, timestamp); + return new DoNotRetryIOException(msg, t); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd5ab4fb/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java index dda4248..257ebfc 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java @@ -38,11 +38,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.VersionInfo; @@ -71,11 +73,15 @@ public class TestIndexWriter { assertNotNull(IndexWriter.getCommitter(env)); } + @SuppressWarnings("deprecation") @Test public void getDefaultFailurePolicy() throws Exception { Configuration conf = new Configuration(false); RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + HRegion region = Mockito.mock(HRegion.class); + Mockito.when(env.getRegion()).thenReturn(region); Mockito.when(env.getConfiguration()).thenReturn(conf); + Mockito.when(region.getTableDesc()).thenReturn(new HTableDescriptor()); assertNotNull(IndexWriter.getFailurePolicy(env)); }
