Repository: phoenix Updated Branches: refs/heads/txn 4c7f0cbb4 -> 4f454a755
PHOENIX-1821 Implement mechanism to convert a table from non transactional to transactional Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4f454a75 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4f454a75 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4f454a75 Branch: refs/heads/txn Commit: 4f454a755f0215f1d58c7e557f09af465c193608 Parents: 4c7f0cb Author: James Taylor <[email protected]> Authored: Wed Oct 28 19:11:49 2015 -0700 Committer: James Taylor <[email protected]> Committed: Wed Oct 28 19:11:49 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AlterTableIT.java | 42 ++- .../org/apache/phoenix/tx/TransactionIT.java | 152 +++++++++- .../phoenix/exception/SQLExceptionCode.java | 2 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 6 +- .../query/ConnectionQueryServicesImpl.java | 289 +++++++++++++------ .../query/ConnectionlessQueryServicesImpl.java | 4 +- .../query/DelegateConnectionQueryServices.java | 4 +- .../apache/phoenix/query/MetaDataMutated.java | 2 +- .../apache/phoenix/schema/MetaDataClient.java | 51 +++- .../apache/phoenix/schema/PMetaDataImpl.java | 4 +- .../org/apache/phoenix/schema/PTableImpl.java | 4 +- .../apache/phoenix/schema/TableProperty.java | 2 +- 12 files changed, 407 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java index 3004bd6..d871cda 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java @@ -38,8 +38,6 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; -import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; - import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeepDeletedCells; @@ -63,6 +61,9 @@ import org.apache.phoenix.util.SchemaUtil; import org.junit.BeforeClass; import org.junit.Test; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; + /** * * A lot of tests in this class test HBase level properties. As a result, @@ -1994,23 +1995,6 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { } @Test - public void testAlterTableToBeTransactional() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - String ddl = "CREATE TABLE test_table (k varchar primary key)"; - createTestTable(getUrl(), ddl); - - try { - ddl = "ALTER TABLE test_table SET transactional=true"; - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_ALTER_PROPERTY.getErrorCode(),e.getErrorCode()); - } - } - - - @Test public void testCreateTableToBeTransactional() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -2022,19 +2006,31 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { assertTrue(table.isTransactional()); assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName())); + try { + ddl = "ALTER TABLE TEST_TRANSACTIONAL_TABLE SET transactional=false"; + conn.createStatement().execute(ddl); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); + } + HBaseAdmin admin = pconn.getQueryServices().getAdmin(); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING")); desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); admin.createTable(desc); + ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true"; + conn.createStatement().execute(ddl); + assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING")).getValue(TxConstants.READ_NON_TX_DATA)); + + // Should be ok, as HBase metadata should match existing metadata. + ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)"; try { - ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true"; conn.createStatement().execute(ddl); fail(); } catch (SQLException e) { - assertEquals(SQLExceptionCode.MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); + assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); } - // Should be ok, as HBase metadata should match existing metadata. - ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)"; + ddl += " transactional=true"; conn.createStatement().execute(ddl); table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE")); htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE")); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 52b5b5f..373ea99 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -10,7 +10,6 @@ package org.apache.phoenix.tx; import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -22,26 +21,25 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.util.Map; -import java.util.Properties; +import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; + +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; -import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; -import com.google.common.collect.Maps; - public class TransactionIT extends BaseHBaseManagedTimeIT { private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE; @@ -119,8 +117,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { conn1.commit(); // verify rows are deleted after commit - // FIXME: this is failing, I think because Tephra isn't handling deletes like we need it to - // TODO: confirm this works once we get the patch from Gary. rs = conn1.createStatement().executeQuery(selectSQL); assertFalse(rs.next()); } @@ -245,5 +241,133 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true"); testRowConflicts(); } - + + @Test + public void testNonTxToTxTable() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)"); + conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (1)"); + conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (2, 'a')"); + conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (3, 'b')"); + conn.commit(); + + conn.createStatement().execute("CREATE INDEX IDX ON NON_TX_TABLE(v)"); + + // Reset empty column value to an empty value like it is pre-transactions + /** TODO: when TEPHRA-143 is fixed, comment this back in + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE")); + List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3))); + for (Put put : puts) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); + } + htable.put(puts); + **/ + + conn.createStatement().execute("ALTER TABLE NON_TX_TABLE SET TRANSACTIONAL=true"); + + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE")); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName())); + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("IDX")); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName())); + + conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (4, 'c')"); + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE WHERE v IS NULL"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertFalse(rs.next()); + conn.commit(); + + conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (5, 'd')"); + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(5,rs.getInt(1)); + assertFalse(rs.next()); + conn.rollback(); + + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertFalse(rs.next()); + + /* TODO: this should succeed too (with SELECT going through index), but doesn't. Try again after TEPHRA-143 is fixed. + * It might be the case that we're still using an empty value for indexes. + conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (5, 'd')"); + rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(5,rs.getInt(1)); + assertFalse(rs.next()); + conn.rollback(); + + rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertFalse(rs.next()); + */ + } + + @Test + public void testNonTxToTxTableFailure() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG + conn.createStatement().execute("CREATE TABLE SYSTEM.NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)"); + conn.createStatement().execute("UPSERT INTO SYSTEM.NON_TX_TABLE VALUES (1)"); + conn.commit(); + // Reset empty column value to an empty value like it is pre-transactions + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE")); + Put put = new Put(PInteger.INSTANCE.toBytes(1)); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); + htable.put(put); + + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + try { + // This will succeed initially in updating the HBase metadata, but then will fail when + // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore + // the coprocessors back to the non transactional ones. + conn.createStatement().execute("ALTER TABLE SYSTEM.NON_TX_TABLE SET TRANSACTIONAL=true"); + fail(); + } catch (SQLException e) { + assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); + } finally { + admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + admin.close(); + } + + ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM.NON_TX_TABLE WHERE v IS NULL"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertFalse(rs.next()); + + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE")); + assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName())); + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index db50f83..9925f75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -249,7 +249,7 @@ public enum SQLExceptionCode { DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"), ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"), - MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"), + TX_MAY_NOT_SWITCH_TO_NON_TX(1071, "44A02", "A transactional table may not be switched to non transactional"), STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"), CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"), http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index e1c8dea..36679f6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -776,11 +776,11 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } @Override - public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) + public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException { - metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); + metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime); //Cascade through to connectionQueryServices too - getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); + getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime); return metaData; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 13a0559..2f9e251 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.GuardedBy; import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.TxConstants; import co.cask.tephra.distributed.PooledClientProvider; import co.cask.tephra.distributed.TransactionServiceClient; import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; @@ -163,7 +164,6 @@ import org.apache.twill.zookeeper.ZKClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.cache.Cache; @@ -562,12 +562,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final long resolvedTime) throws SQLException { + public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final boolean isTransactional, final long resolvedTime) throws SQLException { return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { @Override public PMetaData mutate(PMetaData metaData) throws SQLException { try { - return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); + return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime); } catch (TableNotFoundException e) { // The DROP TABLE may have been processed first, so just ignore. return metaData; @@ -706,7 +706,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } - boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())); + boolean isTransactional = + Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || + Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -718,8 +720,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null); } + // For alter table, remove non transactional index coprocessor + if (descriptor.hasCoprocessor(Indexer.class.getName())) { + descriptor.removeCoprocessor(Indexer.class.getName()); + } } else { if (!descriptor.hasCoprocessor(Indexer.class.getName())) { + // If exception on alter table to transition back to non transactional + if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { + descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName()); + } Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); @@ -761,8 +771,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } - if (isTransactional && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) { - descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null); + if (isTransactional) { + if (!descriptor.hasCoprocessor(TransactionProcessor.class.getName())) { + descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null); + } + } else { + // If exception on alter table to transition back to non transactional + if (descriptor.hasCoprocessor(TransactionProcessor.class.getName())) { + descriptor.removeCoprocessor(TransactionProcessor.class.getName()); + } } } catch (IOException e) { throw ServerUtil.parseServerException(e); @@ -922,21 +939,25 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!modifyExistingMetaData) { return existingDesc; // Caller already knows that no metadata was changed } + boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name())); + // If mapping an existing table as transactional, set property so that existing + // data is correctly read. + if (willBeTx) { + newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString()); + } else { + // If we think we're creating a non transactional table when it's already + // transactional, don't allow. + if (existingDesc.hasCoprocessor(TransactionProcessor.class.getName())) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX) + .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName)) + .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException(); + } + newDesc.remove(TxConstants.READ_NON_TX_DATA); + } if (existingDesc.equals(newDesc)) { return null; // Indicate that no metadata was changed } - // Don't allow TRANSACTIONAL attribute to change, as we may have issued - // a CREATE TABLE IF NOT EXISTS and be updating the metadata. - String existingTxnal = existingDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL); - String newTxnal = newDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL); - if (!Objects.equal(existingTxnal, newTxnal)) { - if (existingTxnal == null) { - newDesc.remove(PhoenixDatabaseMetaData.TRANSACTIONAL); - } else { - newDesc.setValue(PhoenixDatabaseMetaData.TRANSACTIONAL, existingTxnal); - } - } modifyTable(tableName, newDesc, true); return newDesc; } @@ -1263,22 +1284,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME)); - HTableDescriptor tableDescriptor = null; if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) { // For views this will ensure that metadata already exists // For tables and indexes, this will create the metadata if it doesn't already exist - tableDescriptor = ensureTableCreated(tableName, tableType, tableProps, families, splits, true); - // This means the HTable already existed and is transactional which is an error case for now, - // as the timestamps are likely not scaled and the table may have delete markers (which isn't - // handled by Tephra currently). It's possible that we could allow this, but only allow queries - // after a major compaction and some conversion process runs. - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - boolean isTransactional = MetaDataUtil.isTransactional(m, kvBuilder, ptr); - if (tableDescriptor != null && isTransactional) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL) - .setSchemaName(Bytes.toString(schemaBytes)).setTableName(Bytes.toString(tableBytes)) - .build().buildException(); - } + ensureTableCreated(tableName, tableType, tableProps, families, splits, true); } ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (tableType == PTableType.INDEX) { // Index on view @@ -1530,12 +1539,156 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException { List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size()); Map<String, Object> tableProps = new HashMap<String, Object>(); + Set<HTableDescriptor> tableDescriptors = Collections.emptySet(); + boolean nonTxToTx = false; HTableDescriptor tableDescriptor = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps); - SQLException sqlE = null; if (tableDescriptor != null) { + tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size()); + tableDescriptors.add(tableDescriptor); + nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); + /* + * If the table was transitioned from non transactional to transactional, we need + * to also transition the index tables. + */ + if (nonTxToTx) { + updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors); + } + } + + boolean success = false; + boolean metaDataUpdated = !tableDescriptors.isEmpty(); + boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty()); + MetaDataMutationResult result = null; + try { + sendHBaseMetaData(tableDescriptors, pollingNeeded); + + // Special case for call during drop table to ensure that the empty column family exists. + // In this, case we only include the table header row, as until we add schemaBytes and tableBytes + // as args to this function, we have no way of getting them in this case. + // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes + // Also, could be used to update property values on ALTER TABLE t SET prop=xxx + if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) { + return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table); + } + byte[][] rowKeyMetaData = new byte[3][]; + PTableType tableType = table.getType(); + + Mutation m = tableMetaData.get(0); + byte[] rowKey = m.getRow(); + SchemaUtil.getVarChars(rowKey, rowKeyMetaData); + byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; + byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; + byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; + byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); + + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + result = metaDataCoprocessorExec(tableKey, + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + AddColumnRequest.Builder builder = AddColumnRequest.newBuilder(); + for (Mutation m : tableMetaData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + + instance.addColumn(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); + + if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success + success = true; + // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE + if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr) + && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) { + flushTable(table.getPhysicalName().getBytes()); + } + + if (tableType == PTableType.TABLE) { + // If we're changing MULTI_TENANT to true or false, create or drop the view index table + if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){ + long timestamp = MetaDataUtil.getClientTimeStamp(m); + if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) { + this.ensureViewIndexTableCreated(table, timestamp); + } else { + this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp); + } + } + } + } + } finally { + // If we weren't successful with our metadata update + // and we've already pushed the HBase metadata changes to the server + // and we've tried to go from non transactional to transactional + // then we must undo the metadata change otherwise the table will + // no longer function correctly. + // Note that if this fails, we're in a corrupt state. + if (!success && metaDataUpdated && nonTxToTx) { + tableDescriptors.clear(); + tableDescriptors.add(tableDescriptor); + tableProps.remove(TxConstants.READ_NON_TX_DATA); + updateDescriptorForTx(table, tableProps, tableDescriptor, null, tableDescriptors); + sendHBaseMetaData(tableDescriptors, pollingNeeded); + } + } + return result; + } + private void updateDescriptorForTx(PTable table, Map<String, Object> tableProps, HTableDescriptor tableDescriptor, + String txValue, Set<HTableDescriptor> descriptorsToUpdate) throws SQLException { + HBaseAdmin admin = null; + byte[] physicalTableName = table.getPhysicalName().getBytes(); + try { + admin = new HBaseAdmin(config); + setTransactional(tableDescriptor, table.getType(), txValue, tableProps); + Map<String, Object> indexTableProps; + if (txValue == null) { + indexTableProps = Collections.<String,Object>emptyMap(); + } else { + indexTableProps = Maps.newHashMapWithExpectedSize(1); + indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue)); + } + for (PTable index : table.getIndexes()) { + HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes()); + descriptorsToUpdate.add(indexDescriptor); + setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps); + } + try { + HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName)); + descriptorsToUpdate.add(indexDescriptor); + setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps); + } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { + // Ignore, as we may never have created a view index table + } + try { + HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName)); + descriptorsToUpdate.add(indexDescriptor); + setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps); + } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { + // Ignore, as we may never have created a view index table + } + } catch (IOException e) { + throw ServerUtil.parseServerException(e); + } finally { + try { + if (admin != null) admin.close(); + } catch (IOException e) { + logger.warn("Could not close admin",e); + } + } + } + + private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException { + SQLException sqlE = null; + for (HTableDescriptor descriptor : tableDescriptors) { try { - boolean pollingNotNeeded = (!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty()); - modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded); + modifyTable(descriptor.getName(), descriptor, pollingNeeded); } catch (IOException e) { sqlE = ServerUtil.parseServerException(e); } catch (InterruptedException e) { @@ -1550,68 +1703,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - - // Special case for call during drop table to ensure that the empty column family exists. - // In this, case we only include the table header row, as until we add schemaBytes and tableBytes - // as args to this function, we have no way of getting them in this case. - // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes - // Also, could be used to update property values on ALTER TABLE t SET prop=xxx - if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) { - return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table); - } - byte[][] rowKeyMetaData = new byte[3][]; - PTableType tableType = table.getType(); - - Mutation m = tableMetaData.get(0); - byte[] rowKey = m.getRow(); - SchemaUtil.getVarChars(rowKey, rowKeyMetaData); - byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; - byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; - byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; - byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); - - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - AddColumnRequest.Builder builder = AddColumnRequest.newBuilder(); - for (Mutation m : tableMetaData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - - instance.addColumn(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); - - if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND) { // Success - // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE - if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr) - && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) { - flushTable(table.getPhysicalName().getBytes()); - } - - if (tableType == PTableType.TABLE) { - // If we're changing MULTI_TENANT to true or false, create or drop the view index table - if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){ - long timestamp = MetaDataUtil.getClientTimeStamp(m); - if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) { - this.ensureViewIndexTableCreated(table, timestamp); - } else { - this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp); - } - } - } + } + private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException { + if (txValue == null) { + tableDescriptor.remove(TxConstants.READ_NON_TX_DATA); + } else { + tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue); } - return result; + this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps); } private HTableDescriptor separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException { @@ -1648,6 +1747,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Even though TTL is really a HColumnProperty we treat it specially. // We enforce that all column families have the same TTL. commonFamilyProps.put(propName, prop.getSecond()); + } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) { + tableProps.put(TxConstants.READ_NON_TX_DATA, propValue); } } else { if (isHColumnProperty(propName)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 3f31141..4dbb257 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -173,8 +173,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple @Override public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, - long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException { - return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); + long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException { + return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 460a879..5b32f3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -85,8 +85,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple @Override public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, - long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException { - return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); + long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException { + return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java index c78d0b6..1ad3d8b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java @@ -37,6 +37,6 @@ public interface MetaDataMutated { PMetaData addTable(PTable table, long resolvedTime) throws SQLException; PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException; PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException; - PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException; + PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException; PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index b2982e4..1da3295 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -2040,12 +2040,12 @@ public class MetaDataClient { return mutationCode; } - private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta) throws SQLException { - return incrementTableSeqNum(table, expectedType, columnCountDelta, null, null, null, null); + private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional) throws SQLException { + return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, null, null, null); } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, - Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls) + Boolean isTransactional, Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls) throws SQLException { String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); @@ -2077,6 +2077,9 @@ public class MetaDataClient { if (storeNulls != null) { mutateBooleanProperty(tenantId, schemaName, tableName, STORE_NULLS, storeNulls); } + if (isTransactional != null) { + mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional); + } return seqNum; } @@ -2110,6 +2113,7 @@ public class MetaDataClient { Boolean multiTenantProp = null; Boolean disableWALProp = null; Boolean storeNullsProp = null; + Boolean isTransactionalProp = null; ListMultimap<String,Pair<String,Object>> stmtProperties = statement.getProps(); Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size()); @@ -2134,6 +2138,8 @@ public class MetaDataClient { disableWALProp = (Boolean)prop.getSecond(); } else if (propName.equals(STORE_NULLS)) { storeNullsProp = (Boolean)prop.getSecond(); + } else if (propName.equals(TRANSACTIONAL)) { + isTransactionalProp = (Boolean)prop.getSecond(); } } } @@ -2141,6 +2147,7 @@ public class MetaDataClient { } boolean retried = false; boolean changingPhoenixTableProperty = false; + boolean nonTxToTx = false; while (true) { ColumnResolver resolver = FromCompiler.getResolver(statement, connection); table = resolver.getTables().get(0).getTable(); @@ -2192,6 +2199,23 @@ public class MetaDataClient { changingPhoenixTableProperty = true; } } + Boolean isTransactional = null; + if (isTransactionalProp != null) { + if (isTransactionalProp.booleanValue() != table.isTransactional()) { + isTransactional = isTransactionalProp; + // We can only go one way: from non transactional to transactional + // Going the other way would require rewriting the cell timestamps + // and doing a major compaction to get rid of any Tephra specific + // delete markers. + if (!isTransactional) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX) + .setSchemaName(schemaName).setTableName(tableName).build().buildException(); + } + timeStamp = TransactionUtil.getTableTimestamp(connection, isTransactional); + changingPhoenixTableProperty = true; + nonTxToTx = true; + } + } int numPkColumnsAdded = 0; PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN); @@ -2266,10 +2290,12 @@ public class MetaDataClient { if (Boolean.FALSE.equals(isImmutableRows) && !table.getIndexes().isEmpty()) { int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion(); if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setSchemaName(schemaName).setTableName(tableName).build().buildException(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES) + .setSchemaName(schemaName).setTableName(tableName).build().buildException(); } if (connection.getQueryServices().hasInvalidIndexConfiguration() && !table.isTransactional()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setSchemaName(schemaName).setTableName(tableName).build().buildException(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG) + .setSchemaName(schemaName).setTableName(tableName).build().buildException(); } } if (Boolean.TRUE.equals(multiTenant)) { @@ -2277,16 +2303,18 @@ public class MetaDataClient { } } - if (numPkColumnsAdded>0 && !table.getIndexes().isEmpty()) { + if (!table.getIndexes().isEmpty() && (numPkColumnsAdded>0 || nonTxToTx)) { for (PTable index : table.getIndexes()) { - incrementTableSeqNum(index, index.getType(), 1); + // TODO: verify master has fix for multiple index columns added and unit test + incrementTableSeqNum(index, index.getType(), numPkColumnsAdded, nonTxToTx ? Boolean.TRUE : null); } tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } long seqNum = table.getSequenceNumber(); if (changingPhoenixTableProperty || columnDefs.size() > 0) { - seqNum = incrementTableSeqNum(table, statement.getTableType(), 1, isImmutableRows, disableWAL, multiTenant, storeNulls); + // TODO: verify master has fix for multiple data columns added and unit test + seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isTransactional, isImmutableRows, disableWAL, multiTenant, storeNulls); tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } @@ -2336,6 +2364,7 @@ public class MetaDataClient { disableWAL == null ? table.isWALDisabled() : disableWAL, multiTenant == null ? table.isMultiTenant() : multiTenant, storeNulls == null ? table.getStoreNulls() : storeNulls, + isTransactional == null ? table.isTransactional() : isTransactional, TransactionUtil.getResolvedTime(connection, result)); } // Delete rows in view index if we haven't dropped it already @@ -2516,7 +2545,8 @@ public class MetaDataClient { } } if(!indexColumnsToDrop.isEmpty()) { - incrementTableSeqNum(index, index.getType(), -1); + // TODO: verify master has fix for multiple index columns dropped and unit test + incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null); dropColumnMutations(index, indexColumnsToDrop, tableMetaData); } @@ -2525,7 +2555,8 @@ public class MetaDataClient { tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); - long seqNum = incrementTableSeqNum(table, statement.getTableType(), -1); + // TODO: verify master has fix for multiple data columns dropped and unit test + long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(), null); tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); // Force table header to be first in list http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java index 8cfbb18..ae92677 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java @@ -285,7 +285,7 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException { + public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException { PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName)); if (oldTableRef == null) { return this; @@ -299,7 +299,7 @@ public class PMetaDataImpl implements PMetaData { newColumns.addAll(oldColumns); newColumns.addAll(columnsToAdd); } - PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls); + PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional); return addTable(newTable, resolvedTime); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index b78a904..fcd25f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -220,12 +220,12 @@ public class PTableImpl implements PTable { table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.isTransactional(), table.getTableStats()); } - public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException { + public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional) throws SQLException { return new PTableImpl( table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.isTransactional(), table.getTableStats()); + isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), isTransactional, table.getTableStats()); } public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index 3c96405..091b929 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -48,7 +48,7 @@ public enum TableProperty { STORE_NULLS(PhoenixDatabaseMetaData.STORE_NULLS, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false), - TRANSACTIONAL(PhoenixDatabaseMetaData.TRANSACTIONAL, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, false, CANNOT_ALTER_PROPERTY, false), + TRANSACTIONAL(PhoenixDatabaseMetaData.TRANSACTIONAL, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false), ;
