Repository: phoenix Updated Branches: refs/heads/master 909d97596 -> faeab9355
PHOENIX-1289 Drop index during upsert may abort RS (daniel meng + jyates) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/faeab935 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/faeab935 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/faeab935 Branch: refs/heads/master Commit: faeab935554404a042285a01127e9b88b8e3a47c Parents: 909d975 Author: Jesse Yates <jya...@apache.org> Authored: Mon Oct 6 10:58:14 2014 -0700 Committer: Jesse Yates <jya...@apache.org> Committed: Mon Oct 6 11:04:58 2014 -0700 ---------------------------------------------------------------------- .../end2end/index/DropIndexDuringUpsertIT.java | 177 ++++++++++++++ .../index/write/KillServerOnFailurePolicy.java | 2 +- .../index/PhoenixIndexFailurePolicy.java | 239 +++++++++++-------- 3 files changed, 316 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/faeab935/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java new file mode 100644 index 0000000..4e44ec8 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; +import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; +import static org.apache.phoenix.util.TestUtil.LOCALHOST; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class DropIndexDuringUpsertIT extends BaseTest { + private static final int NUM_SLAVES = 4; + private static String url; + private static PhoenixTestDriver driver; + private static HBaseTestingUtility util; + + private static ExecutorService service = Executors.newCachedThreadPool(); + + private static final String SCHEMA_NAME = "S"; + private static final String INDEX_TABLE_NAME = "I"; + private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T"); + private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I"); + + @Before + public void doSetup() throws Exception { + Configuration conf = HBaseConfiguration.create(); + setUpConfigForMiniCluster(conf); + conf.setInt("hbase.client.retries.number", 2); + conf.setInt("hbase.client.pause", 5000); + conf.setInt("hbase.balancer.period", Integer.MAX_VALUE); + conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0); + util = new HBaseTestingUtility(conf); + util.startMiniCluster(NUM_SLAVES); + String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); + url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort + + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; + + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Must update config before starting server + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + } + + @After + public void tearDown() throws Exception { + try { + destroyDriver(driver); + } finally { + util.shutdownMiniCluster(); + } + } + + @Test(timeout = 300000) + public void testWriteFailureDropIndex() throws Exception { + String query; + ResultSet rs; + + // create the table and ensure its empty + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(url, props); + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // create the index and ensure its empty as well + conn.createStatement().execute( + "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(INDEX_TABLE_NAME, rs.getString(3)); + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + + // do an upsert on a separate thread + Future<Boolean> future = service.submit(new UpsertTask()); + Thread.sleep(500); + + // at the same time, drop the index table + conn.createStatement().execute("drop index " + INDEX_TABLE_NAME + " on " + DATA_TABLE_FULL_NAME); + + // verify index is dropped + query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME; + try { + conn.createStatement().executeQuery(query); + fail(); + } catch (SQLException e) { + } + + // assert {@KillServerOnFailurePolicy} is not triggered + assertTrue(future.get()); + } + + private static class UpsertTask implements Callable<Boolean> { + + private Connection conn = null; + + public UpsertTask() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = driver.connect(url, props); + } + + @Override + public Boolean call() throws Exception { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + for (int i = 0; i < 500; i++) { + stmt.setString(1, "a"); + stmt.setString(2, "x"); + stmt.setString(3, Integer.toString(i)); + stmt.execute(); + conn.commit(); + } + return true; + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/faeab935/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java index 0b84cdf..2fb43b5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java @@ -61,7 +61,7 @@ public class KillServerOnFailurePolicy implements IndexFailurePolicy { @Override public void - handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException { + handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause){ // cleanup resources this.stop("Killing ourselves because of an error:" + cause); // notify the regionserver of the failure http://git-wip-us.apache.org/repos/asf/phoenix/blob/faeab935/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index b683c20..565b28c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -19,13 +19,7 @@ package org.apache.phoenix.index; import java.io.IOException; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -89,91 +83,79 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { this.env = env; } + /** + * Attempt to disable the index table when we can't write to it, preventing future updates until the index is + * brought up to date, but allowing historical reads to continue until then. + * <p> + * In the case that we cannot reach the metadata information, we will fall back to the default policy and kill + * this server, so we can attempt to replay the edits on restart. + * </p> + * @param attempted the mutations that were attempted to be written and the tables to which they were written + * @param cause root cause of the failure + */ @Override - public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException { - Set<HTableInterfaceReference> refs = attempted.asMap().keySet(); - List<String> indexTableNames = new ArrayList<String>(1); + public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) { + try { - for (HTableInterfaceReference ref : refs) { - long minTimeStamp = 0; - Collection<Mutation> mutations = attempted.get(ref); - if (mutations != null) { - for (Mutation m : mutations) { + handleFailureWithExceptions(attempted, cause); + } catch (Throwable t) { + LOG.warn("handleFailure failed", t); + super.handleFailure(attempted, cause); + } + } + + private void handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted, + Exception cause) throws Throwable { + Set<HTableInterfaceReference> refs = attempted.asMap().keySet(); + Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size()); + // start by looking at all the tables to which we attempted to write + for (HTableInterfaceReference ref : refs) { + long minTimeStamp = 0; + + // get the minimum timestamp across all the mutations we attempted on that table + Collection<Mutation> mutations = attempted.get(ref); + if (mutations != null) { + for (Mutation m : mutations) { for (List<Cell> kvs : m.getFamilyCellMap().values()) { - for (Cell kv : kvs) { - if (minTimeStamp == 0 || (kv.getTimestamp() >=0 && minTimeStamp < kv.getTimestamp())) { - minTimeStamp = kv.getTimestamp(); - } - } - } - } - } - - if(ref.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) { - PhoenixConnection conn = null; - try { - conn = QueryUtil.getConnection(this.env.getConfiguration()).unwrap( - PhoenixConnection.class); - String userTableName = MetaDataUtil.getUserTableName(ref.getTableName()); - PTable dataTable = PhoenixRuntime.getTable(conn, userTableName); - List<PTable> indexes = dataTable.getIndexes(); - // local index used to get view id from index mutation row key. - PTable localIndex = null; - Map<ImmutableBytesWritable, String> localIndexNames = - new HashMap<ImmutableBytesWritable, String>(); - for (PTable index : indexes) { - if (index.getIndexType() == IndexType.LOCAL - && index.getIndexState() == PIndexState.ACTIVE) { - if (localIndex == null) localIndex = index; - localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes( - index.getViewIndexId())),index.getName().getString()); - } - } - if(localIndex == null) continue; - - IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable); - HRegionInfo regionInfo = this.env.getRegion().getRegionInfo(); - int offset = - regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length - : regionInfo.getStartKey().length; - byte[] viewId = null; - for (Mutation mutation : mutations) { - viewId = indexMaintainer.getViewIndexIdFromIndexRowKey(new ImmutableBytesWritable(mutation.getRow(), offset, mutation.getRow().length - offset)); - String indexTableName = localIndexNames.get(new ImmutableBytesWritable(viewId)); - if(!indexTableNames.contains(indexTableName)) { - indexTableNames.add(indexTableName); - } - } - } catch (ClassNotFoundException e) { - throw new IOException(e); - } catch (SQLException e) { - throw new IOException(e); - } finally { - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - throw new IOException(e); + for (Cell kv : kvs) { + if (minTimeStamp == 0 || (kv.getTimestamp() >= 0 && minTimeStamp < kv.getTimestamp())) { + minTimeStamp = kv.getTimestamp(); } } } - } else { - indexTableNames.add(ref.getTableName()); } + } + + // its a local index table, so we need to convert it to the index table names we should disable + if (ref.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) { + for (String tableName : getLocalIndexNames(ref, mutations)) { + indexTableNames.put(tableName, minTimeStamp); + } + } else { + indexTableNames.put(ref.getTableName(), minTimeStamp); + } + } + + // for all the index tables that we've found, try to disable them and if that fails, try to + for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){ + String indexTableName = tableTimeElement.getKey(); + long minTimeStamp = tableTimeElement.getValue(); + // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor. + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + HTableInterface + systemTable = + env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)); + // Mimic the Put that gets generated by the client on an update of the index state + Put put = new Put(indexTableKey); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + PIndexState.DISABLE.getSerializedBytes()); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + PDataType.LONG.toBytes(minTimeStamp)); + final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put); - for (String indexTableName : indexTableNames) { - // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor. - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); - HTableInterface systemTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)); - // Mimic the Put that gets generated by the client on an update of the index state - Put put = new Put(indexTableKey); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, PIndexState.DISABLE.getSerializedBytes()); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PDataType.LONG.toBytes(minTimeStamp)); - final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put); - - final Map<byte[], MetaDataResponse> results = - systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { + final Map<byte[], MetaDataResponse> results = + systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey, + new Batch.Call<MetaDataService, MetaDataResponse>() { @Override public MetaDataResponse call(MetaDataService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); @@ -185,30 +167,85 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { builder.addTableMetadataMutations(mp.toByteString()); } instance.updateIndexState(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { + if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } return rpcCallback.get(); } }); - if(results.isEmpty()){ - throw new IOException("Didn't get expected result size"); - } - MetaDataResponse tmpResponse = results.values().iterator().next(); - MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse); - - if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { - LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead."); - throw new DoNotRetryIOException("Attemp to disable " + indexTableName + " failed."); - } - LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", cause); - } + if (results.isEmpty()) { + throw new IOException("Didn't get expected result size"); } - } catch (Throwable t) { - LOG.warn("handleFailure failed", t); - super.handleFailure(attempted, cause); - throw new DoNotRetryIOException("Attemp to writes to " + indexTableNames + " failed.", cause); + MetaDataResponse tmpResponse = results.values().iterator().next(); + MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse); + + if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) { + LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations"); + continue; + } + if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { + LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + + result.getMutationCode() + ". Will use default failure policy instead."); + throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); + } + LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", + cause); } } + private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref, + Collection<Mutation> mutations) throws IOException { + Set<String> indexTableNames = new HashSet<String>(1); + PhoenixConnection conn = null; + try { + conn = QueryUtil.getConnection(this.env.getConfiguration()).unwrap( + PhoenixConnection.class); + String userTableName = MetaDataUtil.getUserTableName(ref.getTableName()); + PTable dataTable = PhoenixRuntime.getTable(conn, userTableName); + List<PTable> indexes = dataTable.getIndexes(); + // local index used to get view id from index mutation row key. + PTable localIndex = null; + Map<ImmutableBytesWritable, String> localIndexNames = + new HashMap<ImmutableBytesWritable, String>(); + for (PTable index : indexes) { + if (index.getIndexType() == IndexType.LOCAL + && index.getIndexState() == PIndexState.ACTIVE) { + if (localIndex == null) localIndex = index; + localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes( + index.getViewIndexId())), index.getName().getString()); + } + } + if (localIndex == null) { + return Collections.emptySet(); + } + + IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable); + HRegionInfo regionInfo = this.env.getRegion().getRegionInfo(); + int offset = + regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length + : regionInfo.getStartKey().length; + byte[] viewId = null; + for (Mutation mutation : mutations) { + viewId = + indexMaintainer.getViewIndexIdFromIndexRowKey( + new ImmutableBytesWritable(mutation.getRow(), offset, + mutation.getRow().length - offset)); + String indexTableName = localIndexNames.get(new ImmutableBytesWritable(viewId)); + indexTableNames.add(indexTableName); + } + } catch (ClassNotFoundException e) { + throw new IOException(e); + } catch (SQLException e) { + throw new IOException(e); + } finally { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + return indexTableNames; + } }