PHOENIX-4096 Disallow DML operations on connections with CURRENT_SCN set
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/06f58d56 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/06f58d56 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/06f58d56 Branch: refs/heads/master Commit: 06f58d56c9e2b3badb5e3f6bd5092f03f58f00a8 Parents: 45079c4 Author: James Taylor <[email protected]> Authored: Thu Sep 28 11:24:22 2017 -0700 Committer: James Taylor <[email protected]> Committed: Thu Sep 28 17:51:53 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ExecuteStatementsIT.java | 4 +- .../end2end/IndexToolForPartialBuildIT.java | 5 +- .../phoenix/end2end/QueryExecWithoutSCNIT.java | 3 +- .../org/apache/phoenix/end2end/QueryMoreIT.java | 2 +- .../org/apache/phoenix/end2end/UpgradeIT.java | 23 +- .../end2end/index/PartialIndexRebuilderIT.java | 3 +- .../EndToEndCoveredColumnsIndexBuilderIT.java | 354 ------------ .../index/covered/FailWithoutRetriesIT.java | 140 ----- .../org/apache/phoenix/tx/TransactionIT.java | 23 + .../phoenix/compile/CreateTableCompiler.java | 22 +- .../apache/phoenix/compile/UpsertCompiler.java | 8 +- .../coprocessor/MetaDataRegionObserver.java | 3 - .../phoenix/exception/SQLExceptionCode.java | 3 +- .../apache/phoenix/execute/MutationState.java | 8 - .../org/apache/phoenix/hbase/index/Indexer.java | 11 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 567 ++++++++++++------- .../phoenix/mapreduce/PhoenixInputFormat.java | 5 +- .../index/PhoenixIndexImportDirectMapper.java | 2 +- .../index/PhoenixIndexImportMapper.java | 2 +- .../index/PhoenixIndexPartialBuildMapper.java | 2 +- .../query/ConnectionQueryServicesImpl.java | 1 + .../query/ConnectionlessQueryServicesImpl.java | 1 + .../apache/phoenix/schema/MetaDataClient.java | 33 +- .../java/org/apache/phoenix/util/JDBCUtil.java | 4 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 23 +- .../phoenix/compile/QueryCompilerTest.java | 30 +- .../java/org/apache/phoenix/util/TestUtil.java | 8 +- 27 files changed, 442 insertions(+), 848 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExecuteStatementsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExecuteStatementsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExecuteStatementsIT.java index c8c0d37..9c11144 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExecuteStatementsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExecuteStatementsIT.java @@ -17,11 +17,9 @@ */ package org.apache.phoenix.end2end; -import static org.apache.phoenix.util.TestUtil.ATABLE_NAME; import static org.apache.phoenix.util.TestUtil.A_VALUE; import static org.apache.phoenix.util.TestUtil.BTABLE_NAME; import static org.apache.phoenix.util.TestUtil.B_VALUE; -import static org.apache.phoenix.util.TestUtil.PTSDB_NAME; import static org.apache.phoenix.util.TestUtil.ROW6; import static org.apache.phoenix.util.TestUtil.ROW7; import static org.apache.phoenix.util.TestUtil.ROW8; @@ -145,7 +143,7 @@ public class ExecuteStatementsIT extends ParallelStatsDisabledIT { conn.commit(); String btableName = generateUniqueName(); - ensureTableCreated(getUrl(),btableName, BTABLE_NAME, nextTimestamp()-2); + ensureTableCreated(getUrl(),btableName, BTABLE_NAME); statement = conn.prepareStatement( "upsert into " + btableName + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"); statement.setString(1, "abc"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java index b71750a..19ffe1a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java @@ -162,7 +162,8 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT { PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable)); assertEquals(PIndexState.BUILDING, pindexTable.getIndexState()); assertEquals(rs.getLong(1), pindexTable.getTimeStamp()); - //assert disabled timestamp is set correctly when index mutations are processed on the server + + //assert disabled timestamp assertEquals(0, rs.getLong(2)); String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName); @@ -216,8 +217,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT { assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1)); } assertFalse(rs.next()); - - // conn.createStatement().execute(String.format("DROP INDEX %s ON %s", indxTable, fullTableName)); } finally { conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryExecWithoutSCNIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryExecWithoutSCNIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryExecWithoutSCNIT.java index 51d08d8..a18caf8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryExecWithoutSCNIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryExecWithoutSCNIT.java @@ -35,9 +35,8 @@ import org.junit.Test; public class QueryExecWithoutSCNIT extends ParallelStatsDisabledIT { @Test public void testScanNoSCN() throws Exception { - long ts = System.currentTimeMillis(); String tenantId = getOrganizationId(); - String tableName = initATableValues(tenantId, getDefaultSplits(tenantId), null, ts, getUrl()); + String tableName = initATableValues(tenantId, getDefaultSplits(tenantId), null, null, getUrl()); String query = "SELECT a_string, b_string FROM " + tableName + " WHERE organization_id=? and a_integer = 5"; Properties props = new Properties(); // Test with no CurrentSCN property set Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index 8397e4d..77cb19f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -336,7 +336,7 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { @Test // see - https://issues.apache.org/jira/browse/PHOENIX-1696 public void testSelectColumnMoreThanOnce() throws Exception { Date date = new Date(System.currentTimeMillis()); - initEntityHistoryTableValues("abcd", getDefaultSplits("abcd"), date, 100l); + initEntityHistoryTableValues("abcd", getDefaultSplits("abcd"), date, null); String query = "SELECT NEW_VALUE, NEW_VALUE FROM " + TestUtil.ENTITY_HISTORY_TABLE_NAME + " LIMIT 1"; ResultSet rs = DriverManager.getConnection(getUrl()).createStatement().executeQuery(query); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java index 8cba241..4cb4642 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java @@ -20,7 +20,6 @@ package org.apache.phoenix.end2end; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX; -import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_LOCKED; import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED; import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT; import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT; @@ -64,10 +63,16 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl; import org.apache.phoenix.query.DelegateConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.*; +import org.apache.phoenix.schema.PMetaData; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; import org.apache.phoenix.util.UpgradeUtil; import org.junit.Before; import org.junit.Test; @@ -490,7 +495,7 @@ public class UpgradeIT extends ParallelStatsDisabledIT { } // run upgrade - UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class)); + upgradeTo4_5_0(conn); // Verify base column counts for tenant specific views for (int i = 0; i < 2 ; i++) { @@ -508,6 +513,12 @@ public class UpgradeIT extends ParallelStatsDisabledIT { } + + private static void upgradeTo4_5_0(Connection conn) throws SQLException { + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + pconn.setRunningUpgrade(true); + UpgradeUtil.upgradeTo4_5_0(pconn); + } private enum ColumnDiff { MORE, EQUAL, LESS @@ -570,7 +581,7 @@ public class UpgradeIT extends ParallelStatsDisabledIT { checkBaseColumnCount(null, baseTableSchema, baseTableName, 0); // run upgrade - UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class)); + upgradeTo4_5_0(conn); checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount); checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT); @@ -639,8 +650,8 @@ public class UpgradeIT extends ParallelStatsDisabledIT { return true; } }; - try (PhoenixConnection phxConn = new PhoenixConnection(servicesWithUpgrade, - conn.unwrap(PhoenixConnection.class), HConstants.LATEST_TIMESTAMP)) { + try (PhoenixConnection phxConn = new PhoenixConnection(servicesWithUpgrade, getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES), + conn.unwrap(PhoenixConnection.class).getMetaDataCache())) { try { phxConn.createStatement().execute( "CREATE TABLE " + generateUniqueName() http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 8bf2bc8..d0a06b4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -895,7 +895,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { assertEquals("0", rs.getString(1)); assertEquals(indexStateOnFailure == PIndexState.DISABLE ? fullTableName : fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); - + + clock.time += 1000; runIndexRebuilder(); assertEquals(indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, TestUtil.getIndexState(conn, fullIndexName)); clock.time += WAIT_AFTER_DISABLED; http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java deleted file mode 100644 index 5aa0161..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered; - -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Queue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.hbase.index.IndexTestingUtils; -import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.TableName; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.scanner.Scanner; -import org.apache.phoenix.util.EnvironmentEdge; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple - * {@link IndexCodec} and BatchCache implementation. - */ -@Category(NeedsOwnMiniClusterTest.class) -public class EndToEndCoveredColumnsIndexBuilderIT { - - public class TestState { - - private HTable table; - private long ts; - private VerifyingIndexCodec codec; - - /** - * @param primary - * @param codec - * @param ts - */ - public TestState(HTable primary, VerifyingIndexCodec codec, long ts) { - this.table = primary; - this.ts = ts; - this.codec = codec; - } - - } - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static final byte[] row = Bytes.toBytes("row"); - private static final byte[] family = Bytes.toBytes("FAM"); - private static final byte[] qual = Bytes.toBytes("qual"); - private static final HColumnDescriptor FAM1 = new HColumnDescriptor(family); - - @Rule - public TableName TestTable = new TableName(); - - private TestState state; - - @BeforeClass - public static void setupCluster() throws Exception { - Configuration conf = UTIL.getConfiguration(); - setUpConfigForMiniCluster(conf); - IndexTestingUtils.setupConfig(conf); - // disable version checking, so we can test against whatever version of HBase happens to be - // installed (right now, its generally going to be SNAPSHOT versions). - conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false); - UTIL.startMiniCluster(); - } - - @Before - public void setup() throws Exception { - this.state = setupTest(TestTable.getTableNameString()); - } - - private interface TableStateVerifier { - - /** - * Verify that the state of the table is correct. Should fail the unit test if it isn't as - * expected. - * @param state - */ - public void verify(TableState state); - - } - - /** - * {@link TableStateVerifier} that ensures the kvs returned from the table match the passed - * {@link KeyValue}s when querying on the given columns. - */ - private class ListMatchingVerifier implements TableStateVerifier { - - private List<Cell> expectedKvs; - private ColumnReference[] columns; - private String msg; - - public ListMatchingVerifier(String msg, List<Cell> kvs, ColumnReference... columns) { - this.expectedKvs = kvs; - this.columns = columns; - this.msg = msg; - } - - @Override - public void verify(TableState state) { - IndexMetaData indexMetaData = new IndexMetaData() { - - @Override - public boolean isImmutableRows() { - return false; - } - - @Override - public ReplayWrite getReplayWrite() { - return null; - } - - }; - try { - Scanner kvs = - ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false, false, indexMetaData).getFirst(); - - int count = 0; - Cell kv; - while ((kv = kvs.next()) != null) { - Cell next = expectedKvs.get(count++); - assertEquals( - msg + ": Unexpected kv in table state!\nexpected v1: " - + Bytes.toString(next.getValue()) + "\nactual v1:" + Bytes.toString(kv.getValue()), - next, kv); - } - - assertEquals(msg + ": Didn't find enough kvs in table state!", expectedKvs.size(), count); - } catch (IOException e) { - fail(msg + ": Got an exception while reading local table state! " + e.getMessage()); - } - } - } - - private class VerifyingIndexCodec extends CoveredIndexCodecForTesting { - - private Queue<TableStateVerifier> verifiers = new ArrayDeque<TableStateVerifier>(); - - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { - verify(state); - return super.getIndexDeletes(state, context); - } - - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { - verify(state); - return super.getIndexUpserts(state, context); - } - - private void verify(TableState state) { - TableStateVerifier verifier = verifiers.poll(); - if (verifier == null) return; - verifier.verify(state); - } - } - - /** - * Test that we see the expected values in a {@link TableState} when doing single puts against a - * region. - * @throws Exception on failure - */ - @Test - public void testExpectedResultsInTableStateForSinglePut() throws Exception { - //just do a simple Put to start with - long ts = state.ts; - Put p = new Put(row, ts); - p.add(family, qual, Bytes.toBytes("v1")); - - // get all the underlying kvs for the put - final List<Cell> expectedKvs = new ArrayList<Cell>(); - final List<Cell> allKvs = new ArrayList<Cell>(); - allKvs.addAll(p.getFamilyMap().get(family)); - - // setup the verifier for the data we expect to write - // first call shouldn't have anything in the table - final ColumnReference familyRef = - new ColumnReference(EndToEndCoveredColumnsIndexBuilderIT.family, ColumnReference.ALL_QUALIFIERS); - - VerifyingIndexCodec codec = state.codec; - codec.verifiers.add(new ListMatchingVerifier("cleanup state 1", expectedKvs, familyRef)); - codec.verifiers.add(new ListMatchingVerifier("put state 1", allKvs, familyRef)); - - // do the actual put (no indexing will actually be done) - HTable primary = state.table; - primary.put(p); - primary.flushCommits(); - - // now we do another put to the same row. We should see just the old row state, followed by the - // new + old - p = new Put(row, ts + 1); - p.add(family, qual, Bytes.toBytes("v2")); - expectedKvs.addAll(allKvs); - // add them first b/c the ts is newer - allKvs.addAll(0, p.get(family, qual)); - codec.verifiers.add(new ListMatchingVerifier("cleanup state 2", expectedKvs, familyRef)); - codec.verifiers.add(new ListMatchingVerifier("put state 2", allKvs, familyRef)); - - // do the actual put - primary.put(p); - primary.flushCommits(); - - // cleanup after ourselves - cleanup(state); - } - - /** - * Similar to {@link #testExpectedResultsInTableStateForSinglePut()}, but against batches of puts. - * Previous implementations managed batches by playing current state against each element in the - * batch, rather than combining all the per-row updates into a single mutation for the batch. This - * test ensures that we see the correct expected state. - * @throws Exception on failure - */ - @Test - public void testExpectedResultsInTableStateForBatchPuts() throws Exception { - long ts = state.ts; - // build up a list of puts to make, all on the same row - Put p1 = new Put(row, ts); - p1.add(family, qual, Bytes.toBytes("v1")); - Put p2 = new Put(row, ts + 1); - p2.add(family, qual, Bytes.toBytes("v2")); - - // setup all the verifiers we need. This is just the same as above, but will be called twice - // since we need to iterate the batch. - - // get all the underlying kvs for the put - final List<Cell> allKvs = new ArrayList<Cell>(2); - allKvs.addAll(p2.getFamilyCellMap().get(family)); - allKvs.addAll(p1.getFamilyCellMap().get(family)); - - // setup the verifier for the data we expect to write - // both puts should be put into a single batch - final ColumnReference familyRef = - new ColumnReference(EndToEndCoveredColumnsIndexBuilderIT.family, ColumnReference.ALL_QUALIFIERS); - VerifyingIndexCodec codec = state.codec; - // no previous state in the table - codec.verifiers.add(new ListMatchingVerifier("cleanup state 1", Collections - .<Cell> emptyList(), familyRef)); - codec.verifiers.add(new ListMatchingVerifier("put state 1", p1.getFamilyCellMap().get(family), - familyRef)); - - codec.verifiers.add(new ListMatchingVerifier("cleanup state 2", p1.getFamilyCellMap().get(family), - familyRef)); - // kvs from both puts should be in the table now - codec.verifiers.add(new ListMatchingVerifier("put state 2", allKvs, familyRef)); - - // do the actual put (no indexing will actually be done) - HTable primary = state.table; - primary.setAutoFlush(false); - primary.put(Arrays.asList(p1, p2)); - primary.flushCommits(); - - // cleanup after ourselves - cleanup(state); - } - - /** - * @param tableName name of the table to create for the test - * @return the supporting state for the test - */ - private TestState setupTest(String tableName) throws IOException { - byte[] tableNameBytes = Bytes.toBytes(tableName); - @SuppressWarnings("deprecation") - HTableDescriptor desc = new HTableDescriptor(tableNameBytes); - desc.addFamily(FAM1); - // add the necessary simple options to create the builder - Map<String, String> indexerOpts = new HashMap<String, String>(); - // just need to set the codec - we are going to set it later, but we need something here or the - // initializer blows up. - indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, - CoveredIndexCodecForTesting.class.getName()); - Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER); - - // create the table - HBaseAdmin admin = UTIL.getHBaseAdmin(); - admin.createTable(desc); - HTable primary = new HTable(UTIL.getConfiguration(), tableNameBytes); - - // overwrite the codec so we can verify the current state - Region region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); - Indexer indexer = - (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName()); - NonTxIndexBuilder builder = - (NonTxIndexBuilder) indexer.getBuilderForTesting(); - VerifyingIndexCodec codec = new VerifyingIndexCodec(); - builder.setIndexCodecForTesting(codec); - - // setup the Puts we want to write - final long ts = System.currentTimeMillis(); - EnvironmentEdge edge = new EnvironmentEdge() { - - @Override - public long currentTime() { - return ts; - } - }; - EnvironmentEdgeManager.injectEdge(edge); - - return new TestState(primary, codec, ts); - } - - /** - * Cleanup the test based on the passed state. - * @param state - */ - private void cleanup(TestState state) throws IOException { - EnvironmentEdgeManager.reset(); - state.table.close(); - UTIL.deleteTable(state.table.getTableName()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/FailWithoutRetriesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/FailWithoutRetriesIT.java deleted file mode 100644 index ba8340c..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/FailWithoutRetriesIT.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered; - -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.junit.Assert.fail; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.hbase.index.IndexTestingUtils; -import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.TableName; -import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; -import org.apache.phoenix.hbase.index.covered.ColumnGroup; -import org.apache.phoenix.hbase.index.covered.CoveredColumn; -import org.apache.phoenix.hbase.index.covered.CoveredColumnIndexSpecifierBuilder; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.IndexUpdate; -import org.apache.phoenix.hbase.index.covered.TableState; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} constructor), - * {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize the exception, and just return - * <tt>null</tt> to the client, which then just goes and retries. - */ -@Category(NeedsOwnMiniClusterTest.class) -public class FailWithoutRetriesIT { - - private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class); - @Rule - public TableName table = new TableName(); - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private String getIndexTableName() { - return Bytes.toString(table.getTableName()) + "_index"; - } - - public static class FailingTestCodec extends BaseIndexCodec { - - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException { - throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName()); - } - - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException { - throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName()); - } - } - - @BeforeClass - public static void setupCluster() throws Exception { - // setup and verify the config - Configuration conf = UTIL.getConfiguration(); - setUpConfigForMiniCluster(conf); - IndexTestingUtils.setupConfig(conf); - IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf); - // start the cluster - UTIL.startMiniCluster(); - } - - /** - * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't rethrowing the exception - * correctly? - * <p> - * We use a custom codec to enforce the thrown exception. - * - * @throws Exception - */ - @Test(timeout = 300000) - public void testQuickFailure() throws Exception { - // incorrectly setup indexing for the primary table - target index table doesn't exist, which - // should quickly return to the client - byte[] family = Bytes.toBytes("family"); - ColumnGroup fam1 = new ColumnGroup(getIndexTableName()); - // values are [col1] - fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS)); - CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder(); - // add the index family - builder.addIndexGroup(fam1); - // usually, we would create the index table here, but we don't for the sake of the test. - - // setup the primary table - String primaryTable = Bytes.toString(table.getTableName()); - @SuppressWarnings("deprecation") - HTableDescriptor pTable = new HTableDescriptor(primaryTable); - pTable.addFamily(new HColumnDescriptor(family)); - // override the codec so we can use our test one - builder.build(pTable, FailingTestCodec.class); - - // create the primary table - HBaseAdmin admin = UTIL.getHBaseAdmin(); - admin.createTable(pTable); - Configuration conf = new Configuration(UTIL.getConfiguration()); - // up the number of retries/wait time to make it obvious that we are failing with retries here - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20); - conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000); - HTable primary = new HTable(conf, primaryTable); - primary.setAutoFlush(false, true); - - // do a simple put that should be indexed - Put p = new Put(Bytes.toBytes("row")); - p.add(family, null, Bytes.toBytes("value")); - primary.put(p); - try { - primary.flushCommits(); - fail("Shouldn't have gotten a successful write to the primary table"); - } catch (RetriesExhaustedWithDetailsException e) { - LOG.info("Correclty got a failure of the put!"); - } - primary.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index c76e19c..9286c2e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -44,6 +44,8 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; @@ -52,6 +54,27 @@ import org.junit.Test; public class TransactionIT extends ParallelStatsDisabledIT { @Test + public void testQueryWithSCN() throws Exception { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props);) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true"); + } + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(EnvironmentEdgeManager.currentTimeMillis())); + try (Connection conn = DriverManager.getConnection(getUrl(), props);) { + try { + conn.createStatement().executeQuery("SELECT * FROM " + tableName); + fail(); + } catch (SQLException e) { + assertEquals("Unexpected Exception", + SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET + .getErrorCode(), e.getErrorCode()); + } + } + } + + @Test public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { String tableName = generateUniqueName(); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index 6448edc..4e5580a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@ -52,7 +52,6 @@ import org.apache.phoenix.parse.PrimaryKeyConstraint; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.TableName; -import org.apache.phoenix.query.DelegateConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.MetaDataClient; @@ -84,7 +83,6 @@ public class CreateTableCompiler { final PhoenixConnection connection = statement.getConnection(); ColumnResolver resolver = FromCompiler.getResolverForCreation(create, connection); PTableType type = create.getTableType(); - PhoenixConnection connectionToBe = connection; PTable parentToBe = null; ViewType viewTypeToBe = null; Scan scan = new Scan(); @@ -148,24 +146,6 @@ public class CreateTableCompiler { viewStatementToBe = QueryUtil.getViewStatement(baseTableName.getSchemaName(), baseTableName.getTableName(), buf.toString()); } if (viewTypeToBe != ViewType.MAPPED) { - Long scn = connection.getSCN(); - connectionToBe = (scn != null || tableRef.getTable().isTransactional()) ? connection : - // If we haved no SCN on our connection and the base table is not transactional, freeze the SCN at when - // the base table was resolved to prevent any race condition on - // the error checking we do for the base table. The only potential - // issue is if the base table lives on a different region server - // than the new table will, then we're relying here on the system - // clocks being in sync. - new PhoenixConnection( - // When the new table is created, we still want to cache it - // on our connection. - new DelegateConnectionQueryServices(connection.getQueryServices()) { - @Override - public void addTable(PTable table, long resolvedTime) throws SQLException { - connection.addTable(table, resolvedTime); - } - }, - connection, tableRef.getCurrentTime()+1); viewColumnConstantsToBe = new byte[nColumns][]; ViewWhereExpressionVisitor visitor = new ViewWhereExpressionVisitor(parentToBe, viewColumnConstantsToBe); where.accept(visitor); @@ -201,7 +181,7 @@ public class CreateTableCompiler { throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT) .setMessage("Node: " + node).build().buildException(); } - final MetaDataClient client = new MetaDataClient(connectionToBe); + final MetaDataClient client = new MetaDataClient(connection); final PTable parent = parentToBe; return new BaseMutationPlan(context, operation) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 6f45e28..6445894 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -352,8 +352,12 @@ public class UpsertCompiler { // - transactional table with a connection having an SCN if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) { throw new ReadOnlyTableException(schemaName,tableName); - } - else if (table.isTransactional() && connection.getSCN() != null) { + } else if (connection.isBuildingIndex() && table.getType() != PTableType.INDEX) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_INDEX_UPDATABLE_AT_SCN) + .setSchemaName(schemaName) + .setTableName(tableName) + .build().buildException(); + } else if (table.isTransactional() && connection.getSCN() != null) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName) .setTableName(tableName).build().buildException(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 8f02901..5717c70 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -530,9 +530,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { int indexRebuildRpcRetriesCounter = config.getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER); - // Set SCN so that we don't ping server and have the upper bound set back to - // the timestamp when the failure occurred. - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE)); // Set various phoenix and hbase level timeouts and rpc retries props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Long.toString(indexRebuildQueryTimeoutMs)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 76cc2ba..f25f7f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -183,7 +183,8 @@ public enum SQLExceptionCode { ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views."), INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."), INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."), - UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of CURRENT_SCN and REPLAY_AT must be equal."), + UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values of CURRENT_SCN and BUILD_INDEX_AT must be equal."), + ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated when the BUILD_INDEX_AT property is specified"), /** * HBase and Phoenix specific implementation defined sub-classes. * Column family related exceptions. http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index f8cf9ad..4f96207 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -585,14 +585,6 @@ public class MutationState implements SQLCloseable { rowMutationsPertainingToIndex = rowMutations; } mutationList.addAll(rowMutations); - if (connection.isReplayMutations()) { - // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be - // future dated data row mutations that will get in the way of generating the - // correct index rows on replay. - for (Mutation mutation : rowMutations) { - mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_TABLE_AND_INDEX_WRITES); - } - } if (mutationsPertainingToIndex != null) mutationsPertainingToIndex .addAll(rowMutationsPertainingToIndex); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index e9d735d..8957b30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -99,7 +99,6 @@ import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -395,14 +394,6 @@ public class Indexer extends BaseRegionObserver { "Somehow didn't return an index update but also didn't propagate the failure to the client!"); } - // Assume time stamp of mutation a client defined time stamp if it's not within - // a factor of ten of the current time. - // TODO: get rid of this and have client pass LATEST_TIMESTAMP unless an SCN is set - private static boolean isProbablyClientControlledTimeStamp(Mutation m) { - double ratio = EnvironmentEdgeManager.currentTimeMillis() / MetaDataUtil.getClientTimeStamp(m); - return ratio > 10 || ratio < 0.10; - } - private static void setTimeStamp(KeyValue kv, byte[] tsBytes) { int tsOffset = kv.getTimestampOffset(); System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG); @@ -470,7 +461,7 @@ public class Indexer extends BaseRegionObserver { Mutation firstMutation = miniBatchOp.getOperation(0); ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation); - boolean resetTimeStamp = replayWrite == null && !isProbablyClientControlledTimeStamp(firstMutation); + boolean resetTimeStamp = replayWrite == null; long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); for (int i = 0; i < miniBatchOp.size(); i++) {
