Repository: phoenix Updated Branches: refs/heads/txn 00976e81b -> 995e352c6
Basic configuration of a transactional table Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/995e352c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/995e352c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/995e352c Branch: refs/heads/txn Commit: 995e352c6971fc51888a25bfd0a5b7b737eee958 Parents: 00976e8 Author: James Taylor <[email protected]> Authored: Tue Mar 10 16:34:05 2015 -0700 Committer: James Taylor <[email protected]> Committed: Tue Mar 10 16:34:05 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AlterTableIT.java | 55 ++++++++++++++++++++ .../apache/phoenix/compile/PostDDLCompiler.java | 31 +++++++++-- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 ++ .../query/ConnectionQueryServicesImpl.java | 44 ++++++++++++++-- .../org/apache/phoenix/query/HTableFactory.java | 9 +++- .../apache/phoenix/schema/MetaDataClient.java | 4 +- .../org/apache/phoenix/util/SchemaUtil.java | 11 ++++ 7 files changed, 147 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java index 59698d6..9c0171f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java @@ -38,10 +38,15 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; +import co.cask.tephra.hbase98.TransactionAwareHTable; +import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; + import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; @@ -50,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -1988,4 +1994,53 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { conn.close(); } } + + @Test + public void testAlterTableToBeTransactional() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String ddl = "CREATE TABLE test_table (k varchar primary key)"; + createTestTable(getUrl(), ddl); + + try { + ddl = "ALTER TABLE test_table SET transactional=true"; + conn.createStatement().execute(ddl); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode(),e.getErrorCode()); + } + } + + + @Test + public void testCreateTableToBeTransactional() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String ddl = "CREATE TABLE TEST_TRANSACTIONAL_TABLE (k varchar primary key) transactional=true"; + conn.createStatement().execute(ddl); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE")); + assertTrue(SchemaUtil.isTransactional(htable.getTableDescriptor())); + assertTrue(htable instanceof TransactionAwareHTable); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName())); + + HBaseAdmin admin = pconn.getQueryServices().getAdmin(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING")); + desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); + admin.createTable(desc); + try { + ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true"; + conn.createStatement().execute(ddl); + fail(); + } catch (TableAlreadyExistsException e) { + } + // stays transactional + ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)"; + conn.createStatement().execute(ddl); + assertTrue(SchemaUtil.isTransactional(pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE")).getTableDescriptor())); + // stays non transactional + ddl = "CREATE TABLE IF NOT EXISTS TXN_TEST_EXISTING (k varchar primary key) transactional=true"; + conn.createStatement().execute(ddl); + assertFalse(SchemaUtil.isTransactional(pconn.getQueryServices().getTable(Bytes.toBytes("TXN_TEST_EXISTING")).getTableDescriptor())); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 0c586f0..6b61039 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -42,10 +42,10 @@ import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; -import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ScanUtil; import com.google.common.collect.Lists; @@ -65,7 +65,7 @@ import com.google.common.collect.Lists; */ public class PostDDLCompiler { private final PhoenixConnection connection; - private final StatementContext context; // bogus context + private final Scan scan; public PostDDLCompiler(PhoenixConnection connection) { this(connection, new Scan()); @@ -73,13 +73,36 @@ public class PostDDLCompiler { public PostDDLCompiler(PhoenixConnection connection, Scan scan) { this.connection = connection; - this.context = new StatementContext(new PhoenixStatement(connection), scan); + this.scan = scan; scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE); } public MutationPlan compile(final List<TableRef> tableRefs, final byte[] emptyCF, final byte[] projectCF, final List<PColumn> deleteList, final long timestamp) throws SQLException { - + PhoenixStatement statement = new PhoenixStatement(connection); + final StatementContext context = new StatementContext( + statement, + new ColumnResolver() { + + @Override + public List<TableRef> getTables() { + return tableRefs; + } + + @Override + public TableRef resolveTable(String schemaName, String tableName) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnRef resolveColumn(String schemaName, String tableName, String colName) + throws SQLException { + throw new UnsupportedOperationException(); + } + + }, + scan, + new SequenceManager(statement)); return new MutationPlan() { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 154fef7..400c921 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -266,6 +266,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho private static final String TENANT_POS_SHIFT = "TENANT_POS_SHIFT"; private static final byte[] TENANT_POS_SHIFT_BYTES = Bytes.toBytes(TENANT_POS_SHIFT); + public static final String TRANSACTIONAL = "TRANSACTIONAL"; + public static final byte[] TRANSACTIONAL_BYTES = Bytes.toBytes(TRANSACTIONAL); + private final PhoenixConnection connection; private final ResultSet emptyResultSet; public static final int MAX_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "8"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index b149b92..8a8e072 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -42,6 +42,8 @@ import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.GuardedBy; +import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -152,6 +154,7 @@ import org.apache.phoenix.util.UpgradeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Objects; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.cache.Cache; @@ -292,9 +295,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor()); } catch (org.apache.hadoop.hbase.TableNotFoundException e) { - byte[][] schemaAndTableName = new byte[2][]; - SchemaUtil.getVarChars(tableName, schemaAndTableName); - throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1])); + throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName)); } catch (IOException e) { throw new SQLException(e); } @@ -703,6 +704,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null); } } + + if (SchemaUtil.isTransactional(descriptor) && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) { + descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null); + } } catch (IOException e) { throw ServerUtil.parseServerException(e); } @@ -862,6 +867,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return existingDesc; } + // Don't allow TRANSACTIONAL attribute to change, as we may have issued + // a CREATE TABLE IF NOT EXISTS and be updating the metadata. + String existingTxnal = existingDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL); + String newTxnal = newDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL); + if (!Objects.equal(existingTxnal, newTxnal)) { + if (existingTxnal == null) { + newDesc.remove(PhoenixDatabaseMetaData.TRANSACTIONAL); + } else { + newDesc.setValue(PhoenixDatabaseMetaData.TRANSACTIONAL, existingTxnal); + } + } modifyTable(tableName, newDesc, true); return newDesc; } @@ -1188,10 +1204,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME)); + HTableDescriptor tableDescriptor = null; if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) { // For views this will ensure that metadata already exists // For tables and indexes, this will create the metadata if it doesn't already exist - ensureTableCreated(tableName, tableType, tableProps, families, splits, true); + tableDescriptor = ensureTableCreated(tableName, tableType, tableProps, families, splits, true); } ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (tableType == PTableType.INDEX) { // Index on view @@ -1246,6 +1263,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return rpcCallback.get(); } }); + // This means the HTable already existed and is transactional which is an + // error case unless IF NOT EXISTS was supplied (which the caller will check). + Object isTransactional = tableProps.get(PhoenixDatabaseMetaData.TRANSACTIONAL); + if (tableDescriptor != null && Boolean.TRUE.equals(isTransactional) != SchemaUtil.isTransactional(tableDescriptor)) { + return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, result.getMutationTime(), result.getTable()); + } return result; } @@ -1447,6 +1470,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement SQLException sqlE = null; if (tableDescriptor != null) { try { + if (SchemaUtil.hasTransactional(tableDescriptor)) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE) + .setMessage(PhoenixDatabaseMetaData.TRANSACTIONAL) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()).build().buildException(); + } boolean pollingNotNeeded = (!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty()); modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded); } catch (IOException e) { @@ -1988,6 +2018,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public MutationState updateData(MutationPlan plan) throws SQLException { + PTable table = plan.getContext().getCurrentTable().getTable(); + HTableDescriptor desc = this.getTableDescriptor(table.getPhysicalName().getBytes()); + if (SchemaUtil.isTransactional(desc)) { + return new MutationState(1, plan.getConnection()); + } + return plan.execute(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java index 7a10683..4e1c089 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java @@ -20,8 +20,11 @@ package org.apache.phoenix.query; import java.io.IOException; import java.util.concurrent.ExecutorService; +import co.cask.tephra.hbase98.TransactionAwareHTable; + import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.util.SchemaUtil; /** * Creates clients to access HBase tables. @@ -47,7 +50,11 @@ public interface HTableFactory { static class HTableFactoryImpl implements HTableFactory { @Override public HTableInterface getTable(byte[] tableName, HConnection connection, ExecutorService pool) throws IOException { - return connection.getTable(tableName, pool); + HTableInterface htable = connection.getTable(tableName, pool); + if (SchemaUtil.isTransactional(htable.getTableDescriptor())) { + return new TransactionAwareHTable(htable); + } + return htable; } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index e133433..b2b1bc9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1686,7 +1686,9 @@ public class MetaDataClient { MutationCode code = result.getMutationCode(); switch(code) { case TABLE_ALREADY_EXISTS: - addTableToCache(result); + if (result.getTable() != null) { // Can happen for transactional table that already exists as HBase table + addTableToCache(result); + } if (!statement.ifNotExists()) { throw new TableAlreadyExistsException(schemaName, tableName, result.getTable()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index c9574e3..a94e8ef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -33,6 +33,7 @@ import java.util.Properties; import javax.annotation.Nullable; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.util.Bytes; @@ -370,6 +371,16 @@ public class SchemaUtil { .getName().getBytesPtr(); } + public static boolean isTransactional(HTableDescriptor descriptor) { + byte[] isTransactional = descriptor.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES); + return (isTransactional != null && Boolean.TRUE.toString().equalsIgnoreCase(Bytes.toString(isTransactional))); + } + + public static boolean hasTransactional(HTableDescriptor descriptor) { + byte[] isTransactional = descriptor.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES); + return (isTransactional != null); + } + public static boolean isMetaTable(byte[] tableName) { return Bytes.compareTo(tableName, SYSTEM_CATALOG_NAME_BYTES) == 0; }
