Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 6b4647805 -> 931d23bfb
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 5da8be8..842e881 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -31,7 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -67,7 +69,13 @@ import com.google.common.collect.Multimap; */ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class); + public static final String DISABLE_INDEX_ON_WRITE_FAILURE = "DISABLE_INDEX_ON_WRITE_FAILURE"; + public static final String REBUILD_INDEX_ON_WRITE_FAILURE = "REBUILD_INDEX_ON_WRITE_FAILURE"; + public static final String BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE = "BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE"; private RegionCoprocessorEnvironment env; + private boolean blockDataTableWritesOnFailure; + private boolean disableIndexOnFailure; + private boolean rebuildIndexOnFailure; public PhoenixIndexFailurePolicy() { super(new KillServerOnFailurePolicy()); @@ -77,6 +85,31 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { super.setup(parent, env); this.env = env; + rebuildIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); + HTableDescriptor htd = env.getRegion().getTableDesc(); + // If rebuild index is turned off globally, no need to check the table because the background thread + // won't be running in this case + if (rebuildIndexOnFailure) { + String value = htd.getValue(REBUILD_INDEX_ON_WRITE_FAILURE); + if (value != null) { + rebuildIndexOnFailure = Boolean.parseBoolean(value); + } + } + String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE); + if (value == null) { + disableIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX); + } else { + disableIndexOnFailure = Boolean.parseBoolean(value); + } + value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE); + if (value == null) { + blockDataTableWritesOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); + } else { + blockDataTableWritesOnFailure = Boolean.parseBoolean(value); + } } /** @@ -91,30 +124,34 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { */ @Override public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException { - boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); boolean throwing = true; + long timestamp = HConstants.LATEST_TIMESTAMP; try { - handleFailureWithExceptions(attempted, cause, blockWriteRebuildIndex); + timestamp = handleFailureWithExceptions(attempted, cause); throwing = false; } catch (Throwable t) { LOG.warn("handleFailure failed", t); super.handleFailure(attempted, cause); throwing = false; } finally { - if (!throwing) throw ServerUtil.createIOException(null, cause); + if (!throwing) { + throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, timestamp); + } } } - private void handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted, - Exception cause, boolean blockWriteRebuildIndex) throws Throwable { + private long handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted, + Exception cause) throws Throwable { Set<HTableInterfaceReference> refs = attempted.asMap().keySet(); Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size()); // start by looking at all the tables to which we attempted to write + long timestamp = 0; + boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure; for (HTableInterfaceReference ref : refs) { long minTimeStamp = 0; // get the minimum timestamp across all the mutations we attempted on that table + // FIXME: all cell timestamps should be the same Collection<Mutation> mutations = attempted.get(ref); if (mutations != null) { for (Mutation m : mutations) { @@ -127,6 +164,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } } } + timestamp = minTimeStamp; // If the data table has local index column families then get local indexes to disable. if (ref.getTableName().equals(env.getRegion().getTableDesc().getNameAsString()) @@ -139,37 +177,59 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } } + // Nothing to do if we're not disabling the index and not rebuilding on failure + if (!disableIndexOnFailure && !rebuildIndexOnFailure) { + return timestamp; + } + + PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.ACTIVE; // for all the index tables that we've found, try to disable them and if that fails, try to for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){ String indexTableName = tableTimeElement.getKey(); long minTimeStamp = tableTimeElement.getValue(); + // We need a way of differentiating the block writes to data table case from + // the leave index active case. In either case, we need to know the time stamp + // at which writes started failing so we can rebuild from that point. If we + // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, + // then writes to the data table will be blocked (this is client side logic + // and we can't change this in a minor release). So we use the sign of the + // time stamp to differentiate. + if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) { + minTimeStamp *= -1; + } // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor. HTableInterface systemTable = env.getTable(SchemaUtil .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); - MetaDataMutationResult result = IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp, - systemTable, blockWriteRebuildIndex); + MetaDataMutationResult result = IndexUtil.setIndexDisableTimeStamp(indexTableName, minTimeStamp, + systemTable, newState); if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) { LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations"); continue; } if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { - if (blockWriteRebuildIndex) { + if (leaveIndexActive) { LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = " + result.getMutationCode()); - throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); + // If we're not disabling the index, then we don't want to throw as throwing + // will lead to the RS being shutdown. + if (blockDataTableWritesOnFailure) { + throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); + } } else { LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead."); throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); } } - if (blockWriteRebuildIndex) + if (leaveIndexActive) LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.", cause); else LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", cause); } + // Return the cell time stamp (note they should all be the same) + return timestamp; } private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref, http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index b7153a3..e6c1af2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -64,6 +64,7 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; @@ -104,7 +105,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { codec.initialize(env); // setup the actual index writer - this.writer = new IndexWriter(env, serverName + "-tx-index-writer"); + // For transactional tables, we keep the index active upon a write failure + // since we have the all versus none behavior for transactions. + this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), env, serverName + "-tx-index-writer"); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 5f5237f..3650c2d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -145,8 +145,9 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final int mutateBatchSize; private final long mutateBatchSizeBytes; private final Long scn; + private final boolean replayMutations; private MutationState mutationState; - private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); + private List<PhoenixStatement> statements = new ArrayList<>(); private boolean isAutoFlush = false; private boolean isAutoCommit = false; private PMetaData metaData; @@ -179,7 +180,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade, isRunningUpgrade); + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade, isRunningUpgrade, connection.replayMutations); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -191,7 +192,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(PhoenixConnection connection, MutationState mutationState) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); } public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { @@ -199,7 +200,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -207,14 +208,14 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { - this(services, url, info, metaData, null, false, false); + this(services, url, info, metaData, null, false, false, false); } public PhoenixConnection(PhoenixConnection connection, ConnectionQueryServices services, Properties info) throws SQLException { - this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); } - public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException { + private PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, boolean replayMutations) throws SQLException { GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment(); this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; @@ -241,7 +242,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea Long scnParam = JDBCUtil.getCurrentSCN(url, this.info); checkScn(scnParam); - this.scn = scnParam; + Long replayAtParam = JDBCUtil.getReplayAt(url, this.info); + checkReplayAt(replayAtParam); + checkScnAndReplayAtEquality(scnParam,replayAtParam); + + this.scn = scnParam != null ? scnParam : replayAtParam; + this.replayMutations = replayMutations || replayAtParam != null; this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED) && this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ; this.isAutoCommit = JDBCUtil.getAutoCommit( @@ -315,6 +321,18 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } } + private static void checkReplayAt(Long replayAtParam) throws SQLException { + if (replayAtParam != null && replayAtParam < 0) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_REPLAY_AT).build().buildException(); + } + } + + private static void checkScnAndReplayAtEquality(Long scnParam, Long replayAt) throws SQLException { + if (scnParam != null && replayAt != null && !scnParam.equals(replayAt)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNEQUAL_SCN_AND_REPLAY_AT).build().buildException(); + } + } + private static Properties filterKnownNonProperties(Properties info) { Properties prunedProperties = info; for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) { @@ -444,6 +462,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return scn; } + public boolean isReplayMutations() { + return replayMutations; + } + public int getMutateBatchSize() { return mutateBatchSize; } @@ -493,7 +515,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } private void closeStatements() throws SQLException { - List<SQLCloseable> statements = this.statements; + List<? extends PhoenixStatement> statements = this.statements; // create new list to prevent close of statements // from modifying this list. this.statements = Lists.newArrayList(); @@ -569,6 +591,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea throw new SQLFeatureNotSupportedException(); } + public List<PhoenixStatement> getStatements() { + return statements; + } + @Override public Statement createStatement() throws SQLException { PhoenixStatement statement = new PhoenixStatement(this); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 3606593..da216ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -225,8 +225,16 @@ public class IndexTool extends Configured implements Tool { if (index.getIndexState().equals(PIndexState.BUILDING)) { disableIndexes.add(index.getTableName().getString()); disabledPIndexes.add(index); - if (minDisableTimestamp > index.getIndexDisableTimestamp()) { - minDisableTimestamp = index.getIndexDisableTimestamp(); + // We need a way of differentiating the block writes to data table case from + // the leave index active case. In either case, we need to know the time stamp + // at which writes started failing so we can rebuild from that point. If we + // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, + // then writes to the data table will be blocked (this is client side logic + // and we can't change this in a minor release). So we use the sign of the + // time stamp to differentiate. + long indexDisableTimestamp = Math.abs(index.getIndexDisableTimestamp()); + if (minDisableTimestamp > indexDisableTimestamp) { + minDisableTimestamp = indexDisableTimestamp; indexWithMinDisableTimestamp = index; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 489ffb4..892a12b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -68,7 +68,6 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -783,6 +782,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HTableDescriptor tableDescriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc) : new HTableDescriptor( SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes()); + // By default, do not automatically rebuild/catch up an index on a write failure for (Entry<String,Object> entry : tableProps.entrySet()) { String key = entry.getKey(); if (!TableProperty.isPhoenixTableProperty(key)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index c01d11f..81d05bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -138,8 +138,9 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable"; - // A master switch if to block writes when index build failed + // Block writes to data table when index write fails public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write"; + public static final String INDEX_FAILURE_DISABLE_INDEX = "phoenix.index.failure.disable.index"; // Index will be partially re-built from index disable time stamp - following overlap time public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB = http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 1ddf7eb..5541dcf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -168,7 +168,8 @@ public class QueryServicesOptions { public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000; public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; - public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs + public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = false; + public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 60000; // 60 secs public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 50ff64b..8005e4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -159,6 +159,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -230,7 +231,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @@ -1877,6 +1877,11 @@ public class MetaDataClient { if (tableType == PTableType.TABLE) { Boolean isAppendOnlySchemaProp = (Boolean) TableProperty.APPEND_ONLY_SCHEMA.getValue(tableProps); isAppendOnlySchema = isAppendOnlySchemaProp!=null ? isAppendOnlySchemaProp : false; + + // By default, do not rebuild indexes on write failure + if (tableProps.get(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE) == null) { + tableProps.put(PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE, Boolean.FALSE); + } } // Can't set any of these on views or shared indexes on views http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 3e2c9b5..986debd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -713,17 +713,13 @@ public class IndexUtil { HConstants.NO_NONCE, HConstants.NO_NONCE); } - public static MetaDataMutationResult disableIndexWithTimestamp(String indexTableName, long minTimeStamp, - HTableInterface metaTable, boolean blockWriteRebuildIndex) throws ServiceException, Throwable { + public static MetaDataMutationResult setIndexDisableTimeStamp(String indexTableName, long minTimeStamp, + HTableInterface metaTable, PIndexState newState) throws ServiceException, Throwable { byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); // Mimic the Put that gets generated by the client on an update of the index state Put put = new Put(indexTableKey); - if (blockWriteRebuildIndex) - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.ACTIVE.getSerializedBytes()); - else - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.DISABLE.getSerializedBytes()); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + newState.getSerializedBytes()); put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(minTimeStamp)); put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index 2cab6fb..76d454b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -133,6 +133,11 @@ public class JDBCUtil { return (scnStr == null ? null : Long.parseLong(scnStr)); } + public static Long getReplayAt(String url, Properties info) throws SQLException { + String scnStr = findProperty(url, info, PhoenixRuntime.REPLAY_AT_ATTRIB); + return (scnStr == null ? null : Long.parseLong(scnStr)); + } + @Deprecated // use getMutateBatchSizeBytes public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException { String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 167a35c..7f8731a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -124,6 +124,20 @@ public class PhoenixRuntime { public static final String CURRENT_SCN_ATTRIB = "CurrentSCN"; /** + * Use this connection property to set the long time stamp value at + * which to replay DML statements after a write failure. The time + * stamp value must match the value returned by + * {@link org.apache.phoenix.execute.CommitException#getServerTimestamp()} + * when the exception occurred. Used in conjunction with the + * {@link org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy} + * index write failure policy to provide a means of the client replaying + * updates to ensure that secondary indexes are correctly caught up + * with any data updates when a write failure occurs. The updates + * should be replayed in ascending time stamp order. + */ + public static final String REPLAY_AT_ATTRIB = "ReplayAt"; + + /** * Use this connection property to help with fairness of resource allocation * for the client and server. The value of the attribute determines the * bucket used to rollup resource usage for a particular tenant/organization. Each tenant http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index a3940fc..c90d061 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -25,6 +25,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; @@ -44,6 +45,8 @@ public class ServerUtil { private static final String FORMAT = "ERROR %d (%s): %s"; private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) \\((\\w+)\\): (.*)"); + private static final Pattern PATTERN_FOR_TS = Pattern.compile(",serverTimestamp=(\\d+),"); + private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,"; private static final Map<Class<? extends Exception>, SQLExceptionCode> errorcodeMap = new HashMap<Class<? extends Exception>, SQLExceptionCode>(); static { @@ -176,4 +179,38 @@ public class ServerUtil { } return getTableFromSingletonPool(env, tableName); } + + public static long parseServerTimestamp(Throwable t) { + while (t.getCause() != null) { + t = t.getCause(); + } + return parseTimestampFromRemoteException(t); + } + + private static long parseTimestampFromRemoteException(Throwable t) { + String message = t.getLocalizedMessage(); + if (message != null) { + // If the message matches the standard pattern, recover the SQLException and throw it. + Matcher matcher = PATTERN_FOR_TS.matcher(t.getLocalizedMessage()); + if (matcher.find()) { + String tsString = matcher.group(1); + if (tsString != null) { + return Long.parseLong(tsString); + } + } + } + return HConstants.LATEST_TIMESTAMP; + } + + public static DoNotRetryIOException wrapInDoNotRetryIOException(String msg, Throwable t, long timestamp) { + if (msg == null) { + msg = ""; + } + if (t instanceof SQLException) { + msg = constructSQLErrorMessage((SQLException) t, msg); + } + msg += String.format(FORMAT_FOR_TIMESTAMP, timestamp); + return new DoNotRetryIOException(msg, t); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dba6047/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java index 76ea933..8317b5c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java @@ -38,11 +38,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.VersionInfo; @@ -71,11 +73,15 @@ public class TestIndexWriter { assertNotNull(IndexWriter.getCommitter(env)); } + @SuppressWarnings("deprecation") @Test public void getDefaultFailurePolicy() throws Exception { Configuration conf = new Configuration(false); RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); Mockito.when(env.getConfiguration()).thenReturn(conf); + Mockito.when(region.getTableDesc()).thenReturn(new HTableDescriptor()); assertNotNull(IndexWriter.getFailurePolicy(env)); }
