PHOENIX-3811 Do not disable index on write failure by default
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b1ddaa2b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b1ddaa2b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b1ddaa2b Branch: refs/heads/4.x-HBase-1.1 Commit: b1ddaa2b863ee58674053819632fddc13f0dbce1 Parents: 9838dcf Author: James Taylor <[email protected]> Authored: Wed May 10 09:52:23 2017 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 11 17:39:09 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/AutomaticRebuildIT.java | 221 -------------- .../end2end/IndexToolForPartialBuildIT.java | 15 +- ...olForPartialBuildWithNamespaceEnabledIT.java | 15 +- .../end2end/index/MutableIndexFailureIT.java | 276 ++++++++++++------ .../end2end/index/ReadOnlyIndexFailureIT.java | 291 ------------------- .../apache/phoenix/compile/DeleteCompiler.java | 5 + .../apache/phoenix/compile/UpsertCompiler.java | 4 + .../coprocessor/MetaDataEndpointImpl.java | 6 +- .../coprocessor/MetaDataRegionObserver.java | 44 ++- .../UngroupedAggregateRegionObserver.java | 7 + .../phoenix/exception/SQLExceptionCode.java | 2 + .../apache/phoenix/execute/CommitException.java | 8 +- .../apache/phoenix/execute/MutationState.java | 13 +- .../phoenix/hbase/index/write/IndexWriter.java | 4 + .../write/LeaveIndexActiveFailurePolicy.java | 62 ++++ .../index/PhoenixIndexFailurePolicy.java | 82 +++++- .../index/PhoenixTransactionalIndexer.java | 5 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 44 ++- .../phoenix/mapreduce/index/IndexTool.java | 12 +- .../query/ConnectionQueryServicesImpl.java | 2 +- .../org/apache/phoenix/query/QueryServices.java | 3 +- .../phoenix/query/QueryServicesOptions.java | 3 +- .../apache/phoenix/schema/MetaDataClient.java | 7 +- .../java/org/apache/phoenix/util/IndexUtil.java | 12 +- .../java/org/apache/phoenix/util/JDBCUtil.java | 5 + .../org/apache/phoenix/util/PhoenixRuntime.java | 14 + .../org/apache/phoenix/util/ServerUtil.java | 37 +++ .../hbase/index/write/TestIndexWriter.java | 6 + 28 files changed, 525 insertions(+), 680 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java deleted file mode 100644 index 25cab35..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.Collection; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; -import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.StringUtil; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.common.collect.Maps; - -/** - * Tests for the {@link AutomaticRebuildIT} - */ -@RunWith(Parameterized.class) -public class AutomaticRebuildIT extends BaseOwnClusterIT { - - private final boolean localIndex; - protected boolean isNamespaceEnabled = false; - protected final String tableDDLOptions; - - public AutomaticRebuildIT(boolean localIndex) { - this.localIndex = localIndex; - StringBuilder optionBuilder = new StringBuilder(); - optionBuilder.append(" SPLIT ON(1,2)"); - this.tableDDLOptions = optionBuilder.toString(); - } - - @BeforeClass - public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); - serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); - serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); - serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0"); - serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); - serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); - serverProps.put("hbase.client.pause", "5000"); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000"); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, "5"); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), - new ReadOnlyProps(clientProps.entrySet().iterator())); - } - - @Parameters(name = "localIndex = {0}") - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { { false }, { true } }); - } - - @Test - public void testSecondaryAutomaticRebuildIndex() throws Exception { - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName); - final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); - props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString()); - props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled)); - final Connection conn = DriverManager.getConnection(getUrl(), props); - Statement stmt = conn.createStatement(); - try { - if (isNamespaceEnabled) { - conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); - } - stmt.execute(String.format( - "CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s", - fullTableName, tableDDLOptions)); - String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName); - PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); - FailingRegionObserver.FAIL_WRITE = false; - // insert two rows - upsertRow(stmt1, 1000); - upsertRow(stmt1, 2000); - - conn.commit(); - stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", - (localIndex ? "LOCAL" : ""), indxTable, fullTableName)); - FailingRegionObserver.FAIL_WRITE = true; - upsertRow(stmt1, 3000); - upsertRow(stmt1, 4000); - upsertRow(stmt1, 5000); - try { - conn.commit(); - fail(); - } catch (SQLException e) { - } catch (Exception e) { - } - FailingRegionObserver.FAIL_WRITE = false; - ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indxTable, rs.getString(3)); - String indexState = rs.getString("INDEX_STATE"); - assertEquals(PIndexState.DISABLE.toString(), indexState); - assertFalse(rs.next()); - upsertRow(stmt1, 6000); - upsertRow(stmt1, 7000); - conn.commit(); - int maxTries = 4, nTries = 0; - boolean isInactive = false; - do { - rs = conn.createStatement() - .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + "," - + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM " - +"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " (" - + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where " - + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and " - + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'")); - rs.next(); - if (PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && rs.getLong(2) > 3000) { - isInactive = true; - break; - } - Thread.sleep(10 * 1000); // sleep 10 secs - } while (++nTries < maxTries); - assertTrue(isInactive); - nTries = 0; - boolean isActive = false; - do { - Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) { - isActive = true; - break; - } - } while (++nTries < maxTries); - assertTrue(isActive); - - } finally { - conn.close(); - } - } - - public static void upsertRow(PreparedStatement stmt, int i) throws SQLException { - // insert row - stmt.setInt(1, i); - stmt.setString(2, "uname" + String.valueOf(i)); - stmt.setInt(3, 95050 + i); - stmt.executeUpdate(); - } - - public static class FailingRegionObserver extends SimpleRegionObserver { - public static volatile boolean FAIL_WRITE = false; - public static final String INDEX_NAME = "IDX"; - - @Override - public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { - if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - Mutation operation = miniBatchOp.getOperation(0); - Set<byte[]> keySet = operation.getFamilyMap().keySet(); - for (byte[] family : keySet) { - if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java index 599e601..59a9106 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java @@ -89,9 +89,8 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT { this.tableDDLOptions = optionBuilder.toString(); } - @BeforeClass - public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); + public static Map<String, String> getServerProperties() { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0"); @@ -99,8 +98,14 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT { serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); serverProps.put("hbase.client.pause", "5000"); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString()); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + serverProps.put(QueryServices.INDEX_FAILURE_DISABLE_INDEX, Boolean.TRUE.toString()); + return serverProps; + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = getServerProperties(); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); } @Parameters(name="localIndex = {0}") http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java index 4b2371c..a8c1f1e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java @@ -21,13 +21,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; -import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Maps; @@ -35,7 +31,6 @@ import com.google.common.collect.Maps; /** * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled} */ -@RunWith(Parameterized.class) public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolForPartialBuildIT { @@ -45,15 +40,9 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolFor } @BeforeClass + @Shadower(classBeingShadowed = IndexToolForPartialBuildIT.class) public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); - serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); - serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); - serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); - serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); - serverProps.put("hbase.client.pause", "5000"); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "2000"); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); + Map<String, String> serverProps = getServerProperties(); serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index d07c8fa..075f799 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -24,16 +24,17 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; @@ -44,25 +45,33 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.execute.CommitException; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; -import org.apache.phoenix.util.TestUtil; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** * @@ -72,38 +81,47 @@ import com.google.common.collect.Maps; * */ +@Ignore("Not working for HBase 1.1") @Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) public class MutableIndexFailureIT extends BaseTest { - public static volatile boolean FAIL_WRITE = false; public static final String INDEX_NAME = "IDX"; + public static final String TABLE_NAME = "T"; + + public static volatile boolean FAIL_WRITE = false; + public static volatile String fullTableName; private String tableName; private String indexName; - private String fullTableName; private String fullIndexName; private final boolean transactional; private final boolean localIndex; private final String tableDDLOptions; private final boolean isNamespaceMapped; + private final boolean leaveIndexActiveOnFailure; + private final boolean rebuildIndexOnWriteFailure; private String schema = generateUniqueName(); + private List<CommitException> exceptions = Lists.newArrayList(); @AfterClass public static void doTeardown() throws Exception { tearDownMiniCluster(); } - public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) { + public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean rebuildIndexOnWriteFailure) { this.transactional = transactional; this.localIndex = localIndex; - this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : ""; - this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "") - + (isNamespaceMapped ? "_NM" : ""); - this.indexName = INDEX_NAME; - this.fullTableName = SchemaUtil.getTableName(schema, tableName); + this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "") + + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure)) + + (rebuildIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + rebuildIndexOnWriteFailure)); + this.tableName = FailingRegionObserver.FAIL_TABLE_NAME; + this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME; + fullTableName = SchemaUtil.getTableName(schema, tableName); this.fullIndexName = SchemaUtil.getTableName(schema, indexName); this.isNamespaceMapped = isNamespaceMapped; + this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null ? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : disableIndexOnWriteFailure); + this.rebuildIndexOnWriteFailure = Boolean.TRUE.equals(rebuildIndexOnWriteFailure); } @BeforeClass @@ -115,16 +133,30 @@ public class MutableIndexFailureIT extends BaseTest { serverProps.put("hbase.client.pause", "5000"); serverProps.put("data.tx.snapshot.dir", "/tmp"); serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE)); - Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "4000"); + Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); NUM_SLAVES_BASE = 4; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2}") // name is used by failsafe as file name in reports - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { { false, false, true }, { false, false, false }, { false, true, true }, - { false, true, false }, { true, false, true }, { true, true, true }, { true, false, false }, - { true, true, false } }); + @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4}") // name is used by failsafe as file name in reports + public static List<Object[]> data() { + return Arrays.asList(new Object[][] { + { false, false, true, true, true }, + { false, false, false, true, true }, + { true, false, false, true, true }, + { true, false, true, true, true }, + { false, true, true, true, true }, + { false, true, false, true, true }, + { true, true, false, true, true }, + { true, true, true, true, true }, + + { false, false, false, null, true }, + { false, true, false, false, true }, + { false, false, false, false, null }, + } + ); } @Test @@ -133,9 +165,9 @@ public class MutableIndexFailureIT extends BaseTest { } public void helpTestWriteFailureDisablesIndex() throws Exception { - String secondTableName = fullTableName + "_2"; - String secondIndexName = indexName + "_2"; - String secondFullIndexName = fullIndexName + "_2"; + String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME; +// String thirdIndexName = "C_" + INDEX_NAME; +// String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped)); try (Connection conn = driver.connect(url, props)) { @@ -147,24 +179,26 @@ public class MutableIndexFailureIT extends BaseTest { } conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); - conn.createStatement().execute("CREATE TABLE " + secondTableName - + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); query = "SELECT * FROM " + fullTableName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); FailingRegionObserver.FAIL_WRITE = false; conn.createStatement().execute( - "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + // Create other index which should be local/global if the other index is global/local to + // check the drop index. conn.createStatement().execute( - "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)"); + "CREATE " + (!localIndex ? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)"); +// conn.createStatement().execute( +// "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); query = "SELECT * FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%", + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), null, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); assertEquals(indexName, rs.getString(3)); @@ -172,14 +206,15 @@ public class MutableIndexFailureIT extends BaseTest { assertTrue(rs.next()); assertEquals(secondIndexName, rs.getString(3)); assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); +// assertTrue(rs.next()); +// assertEquals(thirdIndexName, rs.getString(3)); +// assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); initializeTable(conn, fullTableName); - initializeTable(conn, secondTableName); query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); - String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " - + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped); + String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER " + + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT"; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -194,15 +229,14 @@ public class MutableIndexFailureIT extends BaseTest { assertFalse(rs.next()); FailingRegionObserver.FAIL_WRITE = true; - updateTable(conn, fullTableName); - updateTable(conn, secondTableName); + updateTable(conn, true); // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName), new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); assertEquals(indexName, rs.getString(3)); // the index is only disabled for non-txn tables upon index table write failure - if (transactional) { + if (transactional || leaveIndexActiveOnFailure) { assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); } else { String indexState = rs.getString("INDEX_STATE"); @@ -214,24 +248,12 @@ public class MutableIndexFailureIT extends BaseTest { // in an all or none manner. If the table is not transactional, then the data writes // would have succeeded while the index writes would have failed. if (!transactional) { - // Verify UPSERT on data table still work after index is disabled - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a3"); - stmt.setString(2, "x3"); - stmt.setString(3, "3"); - stmt.execute(); - conn.commit(); - stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a3"); - stmt.setString(2, "x3"); - stmt.setString(3, "3"); - stmt.execute(); - conn.commit(); + updateTableAgain(conn, leaveIndexActiveOnFailure); // Verify previous writes succeeded to data table query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); - expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " - + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped); + expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER " + + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT"; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -248,13 +270,20 @@ public class MutableIndexFailureIT extends BaseTest { assertEquals("d", rs.getString(2)); assertFalse(rs.next()); } + // Comment back in when PHOENIX-3815 is fixed +// validateDataWithIndex(conn, fullTableName, thirdFullIndexName, false); // re-enable index table FailingRegionObserver.FAIL_WRITE = false; - waitForIndexToBeActive(conn,indexName); - waitForIndexToBeActive(conn,secondIndexName); + if (rebuildIndexOnWriteFailure) { + // wait for index to be rebuilt automatically + waitForIndexToBeRebuilt(conn,indexName); + } else { + // simulate replaying failed mutation + replayMutations(); + } - // Verify UPSERT on data table still work after index table is recreated + // Verify UPSERT on data table still works after index table is recreated PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a3"); stmt.setString(2, "x4"); @@ -262,31 +291,26 @@ public class MutableIndexFailureIT extends BaseTest { stmt.execute(); conn.commit(); - stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a3"); - stmt.setString(2, "x4"); - stmt.setString(3, "4"); - stmt.execute(); - conn.commit(); - - // verify index table has correct data - validateDataWithIndex(conn, fullTableName, fullIndexName); - validateDataWithIndex(conn, secondTableName, secondFullIndexName); + // verify index table has correct data (note that second index has been dropped) + validateDataWithIndex(conn, fullTableName, fullIndexName, localIndex); } finally { FAIL_WRITE = false; } } - private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException { + private void waitForIndexToBeRebuilt(Connection conn, String index) throws InterruptedException, SQLException { boolean isActive = false; if (!transactional) { - int maxTries = 4, nTries = 0; + int maxTries = 12, nTries = 0; do { - Thread.sleep(15 * 1000); // sleep 15 secs - ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index, - new String[] { PTableType.INDEX.toString() }); + Thread.sleep(5 * 1000); // sleep 5 secs + String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " + + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME + + ") = (" + "'" + schema + "','" + index + "') " + + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL"; + ResultSet rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); - if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) { + if (rs.getLong(1) == 0 && !rs.wasNull()) { isActive = true; break; } @@ -313,16 +337,16 @@ public class MutableIndexFailureIT extends BaseTest { } - private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException { - String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + tableName; + private void validateDataWithIndex(Connection conn, String fullTableName, String fullIndexName, boolean localIndex) throws SQLException { + String query = "SELECT /*+ INDEX(" + fullTableName + " " + SchemaUtil.getTableNameFromFullName(fullIndexName) + ") */ k,v1 FROM " + fullTableName; ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query); String expectedPlan = " OVER " + (localIndex ? Bytes.toString( - SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName()) - : SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString()); + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName()) + : SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString()); String explainPlan = QueryUtil.getExplainPlan(rs); - assertTrue(explainPlan.contains(expectedPlan)); + assertTrue(explainPlan, explainPlan.contains(expectedPlan)); rs = conn.createStatement().executeQuery(query); if (transactional) { // failed commit does not get retried assertTrue(rs.next()); @@ -355,8 +379,26 @@ public class MutableIndexFailureIT extends BaseTest { } } - private void updateTable(Connection conn, String tableName) throws SQLException { - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + private void replayMutations() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + for (int i = 0; i < exceptions.size(); i++) { + CommitException e = exceptions.get(i); + long ts = e.getServerTimestamp(); + props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, Long.toString(ts)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + if (i == 0) { + updateTable(conn, false); + } else if (i == 1) { + updateTableAgain(conn, false); + } else { + fail(); + } + } + } + } + + private void updateTable(Connection conn, boolean commitShouldFail) throws SQLException { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); // Insert new row stmt.setString(1, "d"); stmt.setString(2, "d"); @@ -368,36 +410,92 @@ public class MutableIndexFailureIT extends BaseTest { stmt.setString(3, "2"); stmt.execute(); // Delete existing row - stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?"); + stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?"); stmt.setString(1, "b"); stmt.execute(); try { conn.commit(); - fail(); - } catch (SQLException e) { - System.out.println(); - } catch (Exception e) { - System.out.println(); + if (commitShouldFail) { + fail(); + } + } catch (CommitException e) { + if (!commitShouldFail) { + throw e; + } + exceptions.add(e); } } + private void updateTableAgain(Connection conn, boolean commitShouldFail) throws SQLException { + // Verify UPSERT on data table still work after index is disabled + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "a3"); + stmt.setString(2, "x3"); + stmt.setString(3, "3"); + stmt.execute(); + try { + conn.commit(); + if (commitShouldFail) { + fail(); + } + } catch (CommitException e) { + if (!commitShouldFail) { + throw e; + } + exceptions.add(e); + } + } + public static class FailingRegionObserver extends SimpleRegionObserver { public static volatile boolean FAIL_WRITE = false; - public static final String INDEX_NAME = "IDX"; + public static final String FAIL_INDEX_NAME = "FAIL_IDX"; + public static final String FAIL_TABLE_NAME = "FAIL_TABLE"; + @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { - if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - Mutation operation = miniBatchOp.getOperation(0); - Set<byte[]> keySet = operation.getFamilyMap().keySet(); - for(byte[] family: keySet) { - if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) { - throw new DoNotRetryIOException(); + boolean throwException = false; + if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME) + && FAIL_WRITE) { + throwException = true; + } else { + // When local index updates are atomic with data updates, testing a write failure to a local + // index won't make sense. + Mutation operation = miniBatchOp.getOperation(0); + if (FAIL_WRITE) { + Map<byte[],List<Cell>>cellMap = operation.getFamilyCellMap(); + for (Map.Entry<byte[],List<Cell>> entry : cellMap.entrySet()) { + byte[] family = entry.getKey(); + if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + int regionStartKeyLen = c.getEnvironment().getRegionInfo().getStartKey().length; + Cell firstCell = entry.getValue().get(0); + short indexId = MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(firstCell.getRowArray(), firstCell.getRowOffset() + regionStartKeyLen, SortOrder.getDefault()); + // Only throw for first local index as the test may have multiple local indexes + if (indexId == Short.MIN_VALUE) { + throwException = true; + break; + } + } + } } } + if (throwException) { + dropIndex(c); + throw new DoNotRetryIOException(); + } } + + private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> c) { + try { + Connection connection = + QueryUtil.getConnection(c.getEnvironment().getConfiguration()); + connection.createStatement().execute( + "DROP INDEX IF EXISTS " + "B_" + FAIL_INDEX_NAME + " ON " + + fullTableName); + } catch (ClassNotFoundException e) { + } catch (SQLException e) { + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java deleted file mode 100644 index cf3cb29..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end.index; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; -import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.BaseOwnClusterIT; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.StringUtil; -import org.apache.phoenix.util.TestUtil; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.common.collect.Maps; -/** - * - * Test for failure of region server to write to index table. - * For some reason dropping tables after running this test - * fails unless it runs its own mini cluster. - * - * - * @since 2.1 - */ - -@Category(NeedsOwnMiniClusterTest.class) -@RunWith(Parameterized.class) -public class ReadOnlyIndexFailureIT extends BaseOwnClusterIT { - public static volatile boolean FAIL_WRITE = false; - public static final String INDEX_NAME = "IDX"; - - private String tableName; - private String indexName; - private String fullTableName; - private String fullIndexName; - private final boolean localIndex; - - public ReadOnlyIndexFailureIT(boolean localIndex) { - this.localIndex = localIndex; - this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME; - this.indexName = INDEX_NAME; - this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); - this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); - } - - @Parameters(name = "ReadOnlyIndexFailureIT_localIndex={0}") // name is used by failsafe as file name in reports - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { { false }, { true } }); - } - - @BeforeClass - public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); - serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); - serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); - serverProps.put("hbase.client.pause", "5000"); - serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE)); - serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true"); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true"); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); - serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); - serverProps.put("hbase.coprocessor.abortonerror", "false"); - serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false"); - NUM_SLAVES_BASE = 4; - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), - ReadOnlyProps.EMPTY_PROPS); - } - - @Test - public void testWriteFailureReadOnlyIndex() throws Exception { - helpTestWriteFailureReadOnlyIndex(); - } - - public void helpTestWriteFailureReadOnlyIndex() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = driver.connect(url, props)) { - String query; - ResultSet rs; - conn.setAutoCommit(false); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - query = "SELECT * FROM " + fullTableName; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - FAIL_WRITE = false; - if(localIndex) { - conn.createStatement().execute( - "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName - + " (v1) INCLUDE (v2)"); - } else { - conn.createStatement().execute( - "CREATE INDEX " + indexName + " ON " + fullTableName - + " (v1) INCLUDE (v2)"); - } - - query = "SELECT * FROM " + fullIndexName; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, - StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indexName, rs.getString(3)); - assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName - + " VALUES(?,?,?)"); - stmt.setString(1, "1"); - stmt.setString(2, "aaa"); - stmt.setString(3, "a1"); - stmt.execute(); - conn.commit(); - - FAIL_WRITE = true; - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "2"); - stmt.setString(2, "bbb"); - stmt.setString(3, "b2"); - stmt.execute(); - try { - conn.commit(); - fail(); - } catch (SQLException e) { - } - - // Only successfully committed row should be seen - query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName; - rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals("aaa", rs.getString(1)); - assertFalse(rs.next()); - - // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, - StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indexName, rs.getString(3)); - // the index is always active for tables upon index table write failure - assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - - // if the table is transactional the write to the index table will fail because the - // index has not been disabled - // Verify UPSERT on data table is blocked after index write failed - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "3"); - stmt.setString(2, "ccc"); - stmt.setString(3, "3c"); - try { - stmt.execute(); - /* Writes would be blocked */ - conn.commit(); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode()); - } - - FAIL_WRITE = false; - // Second attempt at writing will succeed - int retries = 0; - do { - Thread.sleep(5 * 1000); // sleep 5 secs - if(!hasIndexDisableTimestamp(conn, indexName)){ - break; - } - if (++retries == 5) { - fail("Failed to rebuild index with allowed time"); - } - } while(true); - - // Verify UPSERT on data table still work after index table is recreated - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "4"); - stmt.setString(2, "ddd"); - stmt.setString(3, "4d"); - stmt.execute(); - conn.commit(); - - // verify index table has data - query = "SELECT count(1) FROM " + fullIndexName; - rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals(3, rs.getInt(1)); - - query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName; - rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals("aaa", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("bbb", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("ddd", rs.getString(1)); - assertFalse(rs.next()); - - query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName; - rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals("aaa", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("bbb", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("ddd", rs.getString(1)); - assertFalse(rs.next()); - } - } - - private static boolean hasIndexDisableTimestamp(Connection conn, String indexName) throws SQLException { - ResultSet rs = conn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + - " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + - " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" + - " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" + - " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" + - " AND " + PhoenixDatabaseMetaData.TABLE_NAME + " = '" + indexName + "'"); - assertTrue(rs.next()); - long ts = rs.getLong(1); - return (!rs.wasNull() && ts > 0); - } - - - public static class FailingRegionObserver extends SimpleRegionObserver { - @Override - public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { - if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - Mutation operation = miniBatchOp.getOperation(0); - Set<byte[]> keySet = operation.getFamilyMap().keySet(); - for(byte[] family: keySet) { - if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index cee545a..fe9be6e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -79,6 +79,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; @@ -533,6 +534,10 @@ public class DeleteCompiler { } else if (runOnServer) { // TODO: better abstraction Scan scan = context.getScan(); + // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be + // future dated data row mutations that will get in the way of generating the + // correct index rows on replay. + scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index bbbd483..e5307d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -713,6 +713,10 @@ public class UpsertCompiler { */ final StatementContext context = queryPlan.getContext(); final Scan scan = context.getScan(); + // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be + // future dated data row mutations that will get in the way of generating the + // correct index rows on replay. + scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable)); scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 800b8a1..a02f4bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -3418,7 +3418,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex); long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength()); - if(curTimeStampVal > 0 && curTimeStampVal < newDisableTimeStamp){ + // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative) + // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to + // drive the partial index rebuild rather than update it with each attempt to update the index + // when a new data table write occurs. + if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) { // not reset disable timestamp newKVs.remove(disableTimeStampKVIndex); disableTimeStampKVIndex = -1; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 9482d37..ce42de6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -74,7 +73,6 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PLong; @@ -100,7 +98,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; - private boolean blockWriteRebuildIndex = false; private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>(); @Override @@ -128,8 +125,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); - blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); } @@ -172,7 +167,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { t.setDaemon(true); t.start(); - if (!enableRebuildIndex && !blockWriteRebuildIndex) { + if (!enableRebuildIndex) { LOG.info("Failure Index Rebuild is skipped by configuration."); return; } @@ -229,7 +224,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { Scan scan = new Scan(); SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, - CompareFilter.CompareOp.GREATER, PLong.INSTANCE.toBytes(0L)); + CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L)); filter.setFilterIfMissing(true); scan.setFilter(filter); scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, @@ -240,10 +235,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { PhoenixDatabaseMetaData.INDEX_STATE_BYTES); scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); - PreparedStatement updateDisabledTimeStampSmt = null; Map<PTable, List<PTable>> dataTableToIndexesMap = null; - MetaDataClient client = null; boolean hasMore = false; List<Cell> results = new ArrayList<Cell>(); scanner = this.env.getRegion().getScanner(scan); @@ -259,17 +252,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver { byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); - if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null - && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) { - // Don't rebuild the building index , because they are marked for aysnc + if (disabledTimeStamp == null || disabledTimeStamp.length == 0) { continue; } - // disableTimeStamp has to be a positive value - long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault()); - if (disabledTimeStampVal <= 0) { - continue; - } byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) { @@ -302,7 +288,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // don't run a second index populations upsert select props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class); - client = new MetaDataClient(conn); dataTableToIndexesMap = Maps.newHashMap(); } String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); @@ -331,7 +316,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild); } LOG.debug("We have found " + indexPTable.getIndexState() + " Index:" + indexPTable.getName() - + " on data table:" + dataPTable.getName() + " which was disabled at " + + " on data table:" + dataPTable.getName() + " which failed to be updated at " + indexPTable.getIndexDisableTimestamp()); indexesToPartiallyRebuild.add(indexPTable); } while (hasMore); @@ -349,9 +334,22 @@ public class MetaDataRegionObserver extends BaseRegionObserver { long earliestDisableTimestamp = Long.MAX_VALUE; List<IndexMaintainer> maintainers = Lists .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); + int signOfDisableTimeStamp = 0; for (PTable index : indexesToPartiallyRebuild) { + // We need a way of differentiating the block writes to data table case from + // the leave index active case. In either case, we need to know the time stamp + // at which writes started failing so we can rebuild from that point. If we + // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, + // then writes to the data table will be blocked (this is client side logic + // and we can't change this in a minor release). So we use the sign of the + // time stamp to differentiate. long disabledTimeStampVal = index.getIndexDisableTimestamp(); - if (disabledTimeStampVal > 0) { + if (disabledTimeStampVal != 0) { + if (signOfDisableTimeStamp != 0 && signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) { + LOG.warn("Found unexpected mix of signs with INDEX_DISABLE_TIMESTAMP for " + dataPTable.getName().getString() + " with " + indexesToPartiallyRebuild); + } + signOfDisableTimeStamp = Long.signum(disabledTimeStampVal); + disabledTimeStampVal = Math.abs(disabledTimeStampVal); if (disabledTimeStampVal < earliestDisableTimestamp) { earliestDisableTimestamp = disabledTimeStampVal; } @@ -409,8 +407,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { batchExecutedPerTableMap.remove(dataPTable.getName()); LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding"); } else { - - updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable); + // Maintain sign of INDEX_DISABLE_TIMESTAMP (see comment above) + updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime * signOfDisableTimeStamp, metaTable); Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName()); if (noOfBatches == null) { noOfBatches = 0l; @@ -507,7 +505,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(disabledTimestamp)); metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_STATE_BYTES, CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(), + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0), put); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 49ef884..a056807 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -379,6 +379,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver RegionScanner theScanner = s; + boolean replayMutations = scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); List<Expression> selectExpressions = null; @@ -610,6 +611,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Cell firstKV = results.get(0); Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),ts); + if (replayMutations) { + delete.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + } mutations.add(delete); // force tephra to ignore this deletes delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); @@ -661,6 +665,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } for (Mutation mutation : row.toRowMutations()) { + if (replayMutations) { + mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + } mutations.add(mutation); } for (i = 0; i < selectExpressions.size(); i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 2836c45..35ba187 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -182,6 +182,8 @@ public enum SQLExceptionCode { ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP."), ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views."), INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."), + INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."), + UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of CURRENT_SCN and REPLAY_AT must be equal."), /** * HBase and Phoenix specific implementation defined sub-classes. * Column family related exceptions. http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java index a9d8311..b0d22d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java @@ -24,10 +24,16 @@ import org.apache.phoenix.jdbc.PhoenixConnection; public class CommitException extends SQLException { private static final long serialVersionUID = 2L; private final int[] uncommittedStatementIndexes; + private final long serverTimestamp; - public CommitException(Exception e, int[] uncommittedStatementIndexes) { + public CommitException(Exception e, int[] uncommittedStatementIndexes, long serverTimestamp) { super(e); this.uncommittedStatementIndexes = uncommittedStatementIndexes; + this.serverTimestamp = serverTimestamp; + } + + public long getServerTimestamp() { + return this.serverTimestamp; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index d32199b..6144c7f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -82,6 +82,7 @@ import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; @@ -673,6 +674,14 @@ public class MutationState implements SQLCloseable { rowMutationsPertainingToIndex = rowMutations; } mutationList.addAll(rowMutations); + if (connection.isReplayMutations()) { + // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be + // future dated data row mutations that will get in the way of generating the + // correct index rows on replay. + for (Mutation mutation : rowMutations) { + mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + } + } if (mutationsPertainingToIndex != null) mutationsPertainingToIndex .addAll(rowMutationsPertainingToIndex); } @@ -1030,6 +1039,7 @@ public class MutationState implements SQLCloseable { joinMutationState(new TableRef(tableRef), valuesMap, txMutations); } } + long serverTimestamp = HConstants.LATEST_TIMESTAMP; Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet().iterator(); while (mutationsIterator.hasNext()) { Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next(); @@ -1106,6 +1116,7 @@ public class MutationState implements SQLCloseable { // Remove batches as we process them mutations.remove(origTableRef); } catch (Exception e) { + serverTimestamp = ServerUtil.parseServerTimestamp(e); SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); if (inferredE != null) { if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { @@ -1127,7 +1138,7 @@ public class MutationState implements SQLCloseable { } // Throw to client an exception that indicates the statements that // were not committed successfully. - sqlE = new CommitException(e, getUncommittedStatementIndexes()); + sqlE = new CommitException(e, getUncommittedStatementIndexes(), serverTimestamp); } finally { try { if (cache!=null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java index 831aa16..a037e92 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java @@ -61,6 +61,10 @@ public class IndexWriter implements Stoppable { this(getCommitter(env), getFailurePolicy(env), env, name); } + public IndexWriter(IndexFailurePolicy failurePolicy, RegionCoprocessorEnvironment env, String name) throws IOException { + this(getCommitter(env), failurePolicy, env, name); + } + public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException { Configuration conf = env.getConfiguration(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b1ddaa2b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java new file mode 100644 index 0000000..edacd3a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.write; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.util.ServerUtil; + +import com.google.common.collect.Multimap; + +/** + * + * Implementation of IndexFailurePolicy which takes no action when an + * index cannot be updated. As with the standard flow of control, an + * exception will still be thrown back to the client. Using this failure + * policy means that the action to take upon failure is completely up + * to the client. + * + */ +public class LeaveIndexActiveFailurePolicy implements IndexFailurePolicy { + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void stop(String arg0) { + } + + @Override + public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { + } + + @Override + public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) + throws IOException { + // get timestamp of first cell + long ts = attempted.values().iterator().next().getFamilyCellMap().values().iterator().next().get(0).getTimestamp(); + throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, ts); + } + +}
