Repository: phoenix Updated Branches: refs/heads/txn 0a9561172 -> 3c0a56db8
more review feedback Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3c0a56db Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3c0a56db Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3c0a56db Branch: refs/heads/txn Commit: 3c0a56db8fade0a0b891ce9f0317a47c7acc5769 Parents: 0a95611 Author: Thomas D'Silva <[email protected]> Authored: Thu Nov 19 16:29:15 2015 -0800 Committer: Thomas D'Silva <[email protected]> Committed: Thu Nov 19 16:29:15 2015 -0800 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 115 ++++++++++--------- .../coprocessor/BaseScannerRegionObserver.java | 6 +- .../coprocessor/MetaDataEndpointImpl.java | 1 + .../phoenix/coprocessor/MetaDataProtocol.java | 5 +- .../phoenix/exception/SQLExceptionCode.java | 2 + .../apache/phoenix/execute/BaseQueryPlan.java | 1 + .../apache/phoenix/execute/MutationState.java | 29 +++-- .../apache/phoenix/index/IndexMaintainer.java | 1 - .../phoenix/index/IndexMetaDataCacheClient.java | 1 - .../apache/phoenix/jdbc/PhoenixStatement.java | 2 +- .../query/ConnectionlessQueryServicesImpl.java | 7 +- .../org/apache/phoenix/schema/TableRef.java | 5 +- .../phoenix/trace/PhoenixMetricsSink.java | 4 + 13 files changed, 97 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 9c6b161..bbe6f64 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -71,9 +71,9 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -93,15 +93,14 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class MutableIndexFailureIT extends BaseTest { private static final int NUM_SLAVES = 4; - private static String url; private static PhoenixTestDriver driver; private static HBaseTestingUtility util; private Timer scheduleTimer; - private static final String SCHEMA_NAME = "S"; - private static final String INDEX_TABLE_NAME = "I"; - private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T"); - private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I"); + private String tableName; + private String indexName; + private String fullTableName; + private String fullIndexName; private boolean transactional; private final String tableDDLOptions; @@ -128,12 +127,21 @@ public class MutableIndexFailureIT extends BaseTest { url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS); + clusterInitialized = true; + setupTxManager(); } @Parameters(name = "transactional = {0}") public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { { false }, { true } }); } + + private void setTableNames() { + tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis(); + indexName = "IDX" + "_" + System.currentTimeMillis(); + fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + } @After public void tearDown() throws Exception { @@ -151,53 +159,54 @@ public class MutableIndexFailureIT extends BaseTest { } } - @Ignore("See PHOENIX-2331") +// @Ignore("See PHOENIX-2331") @Test(timeout=300000) public void testWriteFailureDisablesLocalIndex() throws Exception { helpTestWriteFailureDisablesIndex(true); } - @Ignore("See PHOENIX-2332") +// @Ignore("See PHOENIX-2332") @Test(timeout=300000) public void testWriteFailureDisablesIndex() throws Exception { helpTestWriteFailureDisablesIndex(false); } public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception { + setTableNames(); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = driver.connect(url, props)) { String query; ResultSet rs; conn.setAutoCommit(false); conn.createStatement().execute( - "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; + "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions); + query = "SELECT * FROM " + fullTableName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); if(localIndex) { conn.createStatement().execute( - "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); conn.createStatement().execute( - "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)"); + "CREATE LOCAL INDEX " + indexName+ "_2" + " ON " + fullTableName + " (v2) INCLUDE (v1)"); } else { conn.createStatement().execute( - "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); } - query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; + query = "SELECT * FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME, rs.getString(3)); + assertEquals(indexName, rs.getString(3)); assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); assertFalse(rs.next()); - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a"); stmt.setString(2, "x"); stmt.setString(3, "1"); @@ -206,7 +215,7 @@ public class MutableIndexFailureIT extends BaseTest { TableName indexTable = TableName.valueOf(localIndex ? MetaDataUtil - .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME); + .getLocalIndexTableName(fullTableName) : fullIndexName); HBaseAdmin admin = this.util.getHBaseAdmin(); HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable); try{ @@ -214,7 +223,7 @@ public class MutableIndexFailureIT extends BaseTest { admin.deleteTable(indexTable); } catch (TableNotFoundException ignore) {} - stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a2"); stmt.setString(2, "x2"); stmt.setString(3, "2"); @@ -223,13 +232,8 @@ public class MutableIndexFailureIT extends BaseTest { try { conn.commit(); fail(); - } catch (SQLException e1) { - try { - conn.rollback(); - fail(); - } catch (SQLException e2) { - // rollback fails as well because index is disabled - } + } catch (SQLException e) { + conn.rollback(); } } else { @@ -237,18 +241,20 @@ public class MutableIndexFailureIT extends BaseTest { } // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME, rs.getString(3)); - assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE")); + assertEquals(indexName, rs.getString(3)); + // the index is only disabled for non-txn tables upon index table write failure + PIndexState indexState = transactional ? PIndexState.ACTIVE : PIndexState.DISABLE; + assertEquals(indexState.toString(), rs.getString("INDEX_STATE")); assertFalse(rs.next()); if(localIndex) { - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2", + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2", new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3)); - assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE")); + assertEquals(indexName + "_2", rs.getString(3)); + assertEquals(indexState.toString(), rs.getString("INDEX_STATE")); assertFalse(rs.next()); } @@ -256,7 +262,7 @@ public class MutableIndexFailureIT extends BaseTest { // index has not been disabled if (!transactional) { // Verify UPSERT on data table still work after index is disabled - stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a3"); stmt.setString(2, "x3"); stmt.setString(3, "3"); @@ -267,10 +273,10 @@ public class MutableIndexFailureIT extends BaseTest { if (transactional) { // if the table was transactional there should be 1 row (written before the index // was disabled) - query = "SELECT /*+ NO_INDEX */ v2 FROM " + DATA_TABLE_FULL_NAME; + query = "SELECT /*+ NO_INDEX */ v2 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); String expectedPlan = - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME; + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -279,10 +285,10 @@ public class MutableIndexFailureIT extends BaseTest { } else { // if the table was not transactional there should be three rows (all writes to data // table should succeed) - query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME; + query = "SELECT v2 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); String expectedPlan = - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME; + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -298,14 +304,14 @@ public class MutableIndexFailureIT extends BaseTest { admin.createTable(indexTableDesc); do { Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ break; } if(localIndex) { - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2", + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2", new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ @@ -315,7 +321,7 @@ public class MutableIndexFailureIT extends BaseTest { } while(true); // Verify UPSERT on data table still work after index table is recreated - stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a4"); stmt.setString(2, "x4"); stmt.setString(3, "4"); @@ -323,7 +329,7 @@ public class MutableIndexFailureIT extends BaseTest { conn.commit(); // verify index table has data - query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME; + query = "SELECT count(1) FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -337,6 +343,7 @@ public class MutableIndexFailureIT extends BaseTest { @Test(timeout=300000) public void testWriteFailureWithRegionServerDown() throws Exception { + setTableNames(); String query; ResultSet rs; @@ -344,26 +351,26 @@ public class MutableIndexFailureIT extends BaseTest { try (Connection conn = driver.connect(url, props);) { conn.setAutoCommit(false); conn.createStatement().execute( - "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; + "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions); + query = "SELECT * FROM " + fullTableName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); conn.createStatement().execute( - "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); - query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; + "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + query = "SELECT * FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME, rs.getString(3)); + assertEquals(indexName, rs.getString(3)); assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); assertFalse(rs.next()); - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a"); stmt.setString(2, "x"); stmt.setString(3, "1"); @@ -372,7 +379,7 @@ public class MutableIndexFailureIT extends BaseTest { // find a RS which doesn't has CATALOG table TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG"); - TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME); + TableName indexTable = TableName.valueOf(fullIndexName); final HBaseCluster cluster = this.util.getHBaseCluster(); Collection<ServerName> rss = cluster.getClusterStatus().getServers(); HBaseAdmin admin = this.util.getHBaseAdmin(); @@ -406,7 +413,7 @@ public class MutableIndexFailureIT extends BaseTest { // use timer sending updates in every 10ms this.scheduleTimer = new Timer(true); - this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10); + this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn, fullTableName), 0, 10); // let timer sending some updates Thread.sleep(100); @@ -419,7 +426,7 @@ public class MutableIndexFailureIT extends BaseTest { // Verify the metadata for index is correct. do { Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ @@ -439,10 +446,12 @@ public class MutableIndexFailureIT extends BaseTest { // running private final static AtomicInteger inProgress = new AtomicInteger(0); private final Connection conn; + private final String fullTableName; private int inserts = 0; - public SendingUpdatesScheduleTask(Connection conn) { + public SendingUpdatesScheduleTask(Connection conn, String fullTableName) { this.conn = conn; + this.fullTableName = fullTableName; } public void run() { @@ -453,7 +462,7 @@ public class MutableIndexFailureIT extends BaseTest { try { inProgress.incrementAndGet(); inserts++; - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a" + inserts); stmt.setString(2, "x" + inserts); stmt.setString(3, String.valueOf(inserts)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 6f2f72d..d720806 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.List; import java.util.Set; -import co.cask.tephra.Transaction; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -59,6 +57,8 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import co.cask.tephra.Transaction; + import com.google.common.collect.ImmutableList; @@ -246,7 +246,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param tupleProjector * @param dataRegion * @param indexMaintainer - * @param tx TODO + * @param tx current transaction * @param viewConstants */ protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index facbb88..5743f50 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -1255,6 +1255,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the // system table. Basically, we get all the locks that we don't already hold for all the + // tableMetadata rows. This ensures we don't have deadlock situations (ensuring // primary and then index table locks are held, in that order). For now, we just don't support // indexing on the system table. This is an issue because of the way we manage batch mutation // in the Indexer. http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 22a10a4..ba06828 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -23,9 +23,9 @@ import java.util.List; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; -import org.apache.phoenix.coprocessor.generated.PFunctionProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; +import org.apache.phoenix.coprocessor.generated.PFunctionProtos; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; @@ -74,7 +74,8 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0 = MIN_TABLE_TIMESTAMP + 9; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 = MIN_TABLE_TIMESTAMP + 10; - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0; + // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0; // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need // a different code for every type of error. // ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 7f40ed2..80d9de1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -273,6 +273,8 @@ public enum SQLExceptionCode { CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"), TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE(1074, "44A05", "A transactional table must define VERSION of greater than one"), CANNOT_SPECIFY_SCN_FOR_TXN_TABLE(1075, "44A06", "Cannot use a connection with SCN set for a transactional table"), + NULL_TRANSACTION_CONTEXT(1076, "42901", "No Tranasction Context available"), + TRANSACTION_FAILED(1077, "42901", "Transaction Failure "), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 31b360f..0bdb65a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -223,6 +223,7 @@ public abstract class BaseQueryPlan implements QueryPlan { if (table.getType() != PTableType.SYSTEM) { scan.setConsistency(connection.getConsistency()); } + // TODO fix this in PHOENIX-2415 Support ROW_TIMESTAMP with transactional tables if (!table.isTransactional()) { // Get the time range of row_timestamp column TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 029f73f..1c91fe5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -84,12 +84,6 @@ import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - import co.cask.tephra.Transaction; import co.cask.tephra.Transaction.VisibilityLevel; import co.cask.tephra.TransactionAware; @@ -99,6 +93,12 @@ import co.cask.tephra.TransactionFailureException; import co.cask.tephra.TransactionSystemClient; import co.cask.tephra.hbase11.TransactionAwareHTable; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + /** * * Tracks the uncommitted state @@ -175,7 +175,7 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - public boolean checkpoint(MutationPlan plan) throws SQLException { + public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { Transaction currentTx = getTransaction(); if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { return false; @@ -296,7 +296,7 @@ public class MutationState implements SQLCloseable { public boolean startTransaction() throws SQLException { if (txContext == null) { - throw new SQLException("No transaction context"); // TODO: error code + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); } if (connection.getSCN() != null) { @@ -312,7 +312,7 @@ public class MutationState implements SQLCloseable { return true; } } catch (TransactionFailureException e) { - throw new SQLException(e); // TODO: error code + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException(); } return false; } @@ -544,7 +544,6 @@ public class MutationState implements SQLCloseable { } Long scn = connection.getSCN(); final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); -// final long timestamp = (scn == null ? HConstants.LATEST_TIMESTAMP : scn); return new Iterator<Pair<byte[],List<Mutation>>>() { private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); @@ -722,7 +721,6 @@ public class MutationState implements SQLCloseable { int i = 0; long[] serverTimeStamps = null; boolean sendAll = false; - // Validate up front if not transactional so that we if (tableRefIterator == null) { serverTimeStamps = validateAll(); tableRefIterator = mutations.keySet().iterator(); @@ -928,7 +926,7 @@ public class MutationState implements SQLCloseable { return cache; } - public void clear() throws SQLException { + private void clear() throws SQLException { this.mutations.clear(); numRows = 0; } @@ -986,7 +984,7 @@ public class MutationState implements SQLCloseable { } catch (TransactionFailureException e) { try { txContext.abort(e); - throw TransactionUtil.getSQLException(e); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException(); } catch (TransactionFailureException e1) { throw TransactionUtil.getSQLException(e); } @@ -1000,8 +998,9 @@ public class MutationState implements SQLCloseable { } /** - * Send mutations to hbase, so they are visible to subsequent reads, - * starting a transaction if transactional and one has not yet been started. + * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a + * query. In this way, they are visible to subsequent reads but are not actually committed until + * commit is called. * @param tableRefs * @return true if at least partially transactional and false otherwise. * @throws SQLException http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 9b051f2..ab7534a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -876,7 +876,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - // TODO see if implement PhoenixTransactionalIndexer.preDelete will work instead of the following check || (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { nDeleteCF++; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index 1b0c599..05a01b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -66,7 +66,6 @@ public class IndexMetaDataCacheClient { /** * Send the index metadata cahce to all region servers for regions that will handle the mutations. - * @param txState TODO * @return client-side {@link ServerCache} representing the added index metadata cache * @throws SQLException * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 55ae8b3..bacea92 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -334,7 +334,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); state.sendUncommitted(tableRefs); - state.checkpoint(plan); + state.checkpointIfNeccessary(plan); MutationState lastState = plan.execute(); state.join(lastState); if (connection.getAutoCommit()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 79337bf..8f726dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -83,13 +83,13 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SequenceUtil; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import co.cask.tephra.TransactionManager; import co.cask.tephra.TransactionSystemClient; import co.cask.tephra.inmemory.InMemoryTxSystemClient; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * @@ -118,7 +118,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // Use KeyValueBuilder that builds real KeyValues, as our test utils require this this.kvBuilder = GenericKeyValueBuilder.INSTANCE; - // TOOD: copy/paste from ConnectionQueryServicesImpl Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); for (Entry<String,String> entry : services.getProps()) { config.set(entry.getKey(), entry.getValue()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java index f1ef456..ef8e811 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java @@ -105,7 +105,7 @@ public class TableRef { public int hashCode() { final int prime = 31; int result = alias == null ? 0 : alias.hashCode(); - result = prime * result + ( this.table!=null && this.table.getName()!=null ? this.table.getName().getString().hashCode() : 0); + result = prime * result + ( this.table.getName()!=null ? this.table.getName().getString().hashCode() : 0); return result; } @@ -117,7 +117,8 @@ public class TableRef { TableRef other = (TableRef)obj; // a null alias on either side should mean a wildcard and should not fail the equals check if ((alias == null && other.alias != null) || (alias != null && !alias.equals(other.alias))) return false; - if (!table.getName().getString().equals(other.table.getName().getString())) return false; + if (((table.getName() == null && other.table.getName() != null) + || !table.getName().getString().equals(other.table.getName().getString()))) return false; return true; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c0a56db/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java index ee06179..c39df2c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java @@ -177,6 +177,10 @@ public class PhoenixMetricsSink implements MetricsSink { ANNOTATION_COUNT + " smallint" + " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", " + PARENT.columnName + ", " + SPAN.columnName + "))\n" + + // We have a config parameter that can be set so that tables are + // transactional by default. If that's set, we still don't want these system + // tables created as transactional tables, make these table non + // transactional PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; ; PreparedStatement stmt = conn.prepareStatement(ddl);
