http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 4f08846..61be561 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -109,7 +109,6 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -541,7 +540,11 @@ public class UpsertCompiler { // Disable running upsert select on server side if a table has global mutable secondary indexes on it boolean hasGlobalMutableIndexes = SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows(); boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery(); - runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit && !table.isTransactional() + runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit + // We can run the upsert select for initial index population on server side for transactional + // tables since the writes do not need to be done transactionally, since we gate the index + // usage on successfully writing all data rows. + && (!table.isTransactional() || table.getType() == PTableType.INDEX) && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1; } @@ -1039,12 +1042,15 @@ public class UpsertCompiler { byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + if (aggPlan.getTableRef().getTable().isTransactional() + || (table.getType() == PTableType.INDEX && table.isTransactional())) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); - scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); } ResultIterator iterator = aggPlan.iterator(); try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java new file mode 100644 index 0000000..70658fb --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; + + +public class OmidGCProcessor extends DelegateRegionObserver { + + public OmidGCProcessor() { + super(new BaseRegionObserver()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java new file mode 100644 index 0000000..fc246d4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; + + +public class OmidTransactionalProcessor extends DelegateRegionObserver { + + public OmidTransactionalProcessor() { + super(new BaseRegionObserver()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c325d70..a667316 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -130,6 +130,8 @@ import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -429,6 +431,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + PhoenixTransactionProvider txnProvider = null; + if (txState != null) { + int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + } List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); boolean isUpsert = false; @@ -535,7 +543,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useIndexProto = false; } - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); if(needToWrite) { synchronized (lock) { if (isRegionClosingOrSplitting) { @@ -658,6 +665,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver valueGetter, ptr, results.get(0).getTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + + if (txnProvider != null) { + put = txnProvider.markPutAsCommitted(put, ts, ts); + } indexMutations.add(put); } } @@ -725,6 +736,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver for (Mutation mutation : row.toRowMutations()) { if (replayMutations != null) { mutation.setAttribute(REPLAY_WRITES, replayMutations); + } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { + mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); } mutations.add(mutation); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index d6a70f2..c0a81ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -297,7 +297,12 @@ public enum SQLExceptionCode { UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: "), CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction if transactions are disabled."), CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: "), - CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table from non transactional to transactional for "), + CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table from non transactional to transactional for"), + UNSUPPORTED_COLUMN_ENCODING_FOR_TXN_PROVIDER(1094, "44A25", "Column encoding is not supported for"), + UNSUPPORTED_STORAGE_FORMAT_FOR_TXN_PROVIDER(1095, "44A26", "Only ONE_CELL_PER_COLUMN storage scheme is supported for"), + CANNOT_SWITCH_TXN_PROVIDERS(1096, "44A27", "Cannot switch transaction providers."), + TTL_UNSUPPORTED_FOR_TXN_TABLE(10947, "44A28", "TTL is not supported for"), + CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE(10948, "44A29", "Local indexes cannot be created for"), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java index f45b356..0618945 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -46,19 +46,14 @@ import com.google.protobuf.Message; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; -public class DelegateHTable implements HTableInterface { - protected final HTableInterface delegate; +public class DelegateHTable implements Table { + protected final Table delegate; - public DelegateHTable(HTableInterface delegate) { + public DelegateHTable(Table delegate) { this.delegate = delegate; } @Override - public byte[] getTableName() { - return delegate.getTableName(); - } - - @Override public TableName getName() { return delegate.getName(); } @@ -79,11 +74,6 @@ public class DelegateHTable implements HTableInterface { } @Override - public Boolean[] exists(List<Get> gets) throws IOException { - return delegate.exists(gets); - } - - @Override public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { delegate.batch(actions, results); } @@ -117,12 +107,6 @@ public class DelegateHTable implements HTableInterface { return delegate.get(gets); } - @SuppressWarnings("deprecation") - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - return delegate.getRowOrBefore(row, family); - } - @Override public ResultScanner getScanner(Scan scan) throws IOException { return delegate.getScanner(scan); @@ -195,23 +179,6 @@ public class DelegateHTable implements HTableInterface { return delegate.incrementColumnValue(row, family, qualifier, amount, durability); } - @SuppressWarnings("deprecation") - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - return delegate.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - } - - @Override - public boolean isAutoFlush() { - return delegate.isAutoFlush(); - } - - @Override - public void flushCommits() throws IOException { - delegate.flushCommits(); - } - @Override public void close() throws IOException { delegate.close(); @@ -234,22 +201,6 @@ public class DelegateHTable implements HTableInterface { delegate.coprocessorService(service, startKey, endKey, callable, callback); } - @SuppressWarnings("deprecation") - @Override - public void setAutoFlush(boolean autoFlush) { - delegate.setAutoFlush(autoFlush); - } - - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - delegate.setAutoFlush(autoFlush, clearBufferOnFail); - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - delegate.setAutoFlushTo(autoFlush); - } - @Override public long getWriteBufferSize() { return delegate.getWriteBufferSize(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index d2d1eea..14f13b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -39,15 +39,13 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.htrace.Span; import org.apache.htrace.TraceScope; -import org.apache.phoenix.cache.IndexMetaDataCache; -import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -294,10 +292,11 @@ public class MutationState implements SQLCloseable { // the Transaction outside of MutationState, this seems reasonable, as the member variables // would not change as these threads are running. We also clone mutationState to ensure that // the transaction context won't change due to a commit when auto commit is true. - public HTableInterface getHTable(PTable table) throws SQLException { - HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); + public Table getHTable(PTable table) throws SQLException { + Table htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) { - htable = phoenixTransactionContext.getTransactionalTable(htable, table.isImmutableRows()); + // We're only using this table for reading, so we want it wrapped even if it's an index + htable = phoenixTransactionContext.getTransactionalTable(htable, table.isImmutableRows() || table.getType() == PTableType.INDEX); } return htable; } @@ -533,7 +532,7 @@ public class MutationState implements SQLCloseable { if (indexMutationsMap == null) { PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection, table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap()); - try (HTableInterface htable = connection.getQueryServices().getTable( + try (Table htable = connection.getQueryServices().getTable( table.getPhysicalName().getBytes())) { Collection<Pair<Mutation, byte[]>> allMutations = generator.getIndexUpdates(htable, mutationsPertainingToIndex.iterator()); @@ -562,8 +561,7 @@ public class MutationState implements SQLCloseable { MultiRowMutationState multiRowMutationState = mutations.remove(key); if (multiRowMutationState != null) { final List<Mutation> deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, - deleteMutations, null); + generateMutations(key, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); if (indexMutations == null) { indexMutations = deleteMutations; } else { @@ -960,7 +958,7 @@ public class MutationState implements SQLCloseable { // region servers. shouldRetry = cache != null; SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); + Table hTable = connection.getQueryServices().getTable(htableName); try { if (table.isTransactional()) { // Track tables to which we've sent uncommitted data @@ -968,7 +966,12 @@ public class MutationState implements SQLCloseable { uncommittedPhysicalNames.add(table.getPhysicalName().getString()); phoenixTransactionContext.markDMLFence(table); } - hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable()); + // Only pass true for last argument if the index is being written to on it's own (i.e. initial + // index population), not if it's being written to for normal maintenance due to writes to + // the data table. This case is different because the initial index population does not need + // to be done transactionally since the index is only made active after all writes have + // occurred successfully. + hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX); } numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); @@ -981,7 +984,7 @@ public class MutationState implements SQLCloseable { for (final List<Mutation> mutationBatch : mutationBatchList) { if (shouldRetryIndexedMutation) { // if there was an index write failure, retry the mutation in a loop - final HTableInterface finalHTable = hTable; + final Table finalHTable = hTable; PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() { @Override public void doMutation() throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index 877c939..8a94314 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -34,9 +34,9 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -62,6 +62,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PVarbinary; @@ -79,14 +80,26 @@ import com.google.common.primitives.Longs; public class PhoenixTxIndexMutationGenerator { private final PhoenixIndexCodec codec; private final PhoenixIndexMetaData indexMetaData; + private final ConnectionQueryServices services; + private final byte[] regionStartKey; + private final byte[] regionEndKey; + private final byte[] tableName; - public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + private PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + this.services = services; this.indexMetaData = indexMetaData; - this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, tableName); + this.regionStartKey = regionStartKey; + this.regionEndKey = regionEndKey; + this.tableName = tableName; + this.codec = new PhoenixIndexCodec(conf, tableName); } - public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName) { - this(conf, indexMetaData, tableName, null, null); + public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + this(null, conf, indexMetaData, tableName, regionStartKey, regionEndKey); + } + + public PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, PhoenixIndexMetaData indexMetaData, byte[] tableName) { + this(services, services.getConfiguration(), indexMetaData, tableName, null, null); } private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { @@ -99,7 +112,7 @@ public class PhoenixTxIndexMutationGenerator { stored.addAll(m); } - public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface htable, Iterator<? extends Mutation> mutationIterator) throws IOException, SQLException { + public Collection<Pair<Mutation, byte[]>> getIndexUpdates(Table htable, Iterator<? extends Mutation> mutationIterator) throws IOException, SQLException { if (!mutationIterator.hasNext()) { return Collections.emptyList(); @@ -179,7 +192,7 @@ public class PhoenixTxIndexMutationGenerator { scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, null, true, -1); scanRanges.initializeScan(scan); - Table txTable = indexMetaData.getTransactionContext().getTransactionalTable(htable, isImmutable); + Table txTable = indexMetaData.getTransactionContext().getTransactionalTable(htable, true); // For rollback, we need to see all versions, including // the last committed version as there may be multiple // checkpointed versions. @@ -313,7 +326,18 @@ public class PhoenixTxIndexMutationGenerator { private void generateDeletes(PhoenixIndexMetaData indexMetaData, Collection<Pair<Mutation, byte[]>> indexUpdates, byte[] attribValue, TxTableState state) throws IOException { - Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + byte[] regionStartKey = this.regionStartKey; + byte[] regionEndKey = this.regionEndKey; + if (services != null && indexMetaData.hasLocalIndexes()) { + try { + HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, state.getCurrentRowKey()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndKey = tableRegionLocation.getRegionInfo().getEndKey(); + } catch (SQLException e) { + throw new IOException(e); + } + } + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData, regionStartKey, regionEndKey); for (IndexUpdate delete : deletes) { if (delete.isValid()) { delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); @@ -328,7 +352,18 @@ public class PhoenixTxIndexMutationGenerator { TxTableState state) throws IOException { state.applyMutation(); - Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + byte[] regionStartKey = this.regionStartKey; + byte[] regionEndKey = this.regionEndKey; + if (services != null && indexMetaData.hasLocalIndexes()) { + try { + HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, state.getCurrentRowKey()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndKey = tableRegionLocation.getRegionInfo().getEndKey(); + } catch (SQLException e) { + throw new IOException(e); + } + } + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData, regionStartKey, regionEndKey); boolean validPut = false; for (IndexUpdate put : puts) { if (put.isValid()) { @@ -476,8 +511,8 @@ public class PhoenixTxIndexMutationGenerator { }; try { PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes); - return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, - table.getPhysicalName().getBytes()); + return new PhoenixTxIndexMutationGenerator(connection.getQueryServices(),connection.getQueryServices().getConfiguration(), indexMetaData, + table.getPhysicalName().getBytes(), null, null); } catch (IOException e) { throw new RuntimeException(e); // Impossible } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index f13e97a..0ff83ca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -58,7 +58,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder { Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]); meth.setAccessible(true); this.codec = meth.newInstance(); - this.codec.initialize(conf, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey(), env.getRegion().getRegionInfo().getTable().getName()); + this.codec.initialize(conf, env.getRegion().getRegionInfo().getTable().getName()); } catch (Exception e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java index 7dde941..0b70de3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java @@ -39,10 +39,12 @@ public interface IndexCodec { * the current state of the table that needs to be cleaned up. Generally, you only care about the latest * column values, for each column you are indexing for each index table. * @param context TODO + * @param regionStartKey TODO + * @param regionEndKey TODO * @return the pairs of (deletes, index table name) that should be applied. * @throws IOException */ - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException; + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException; // table state has the pending update already applied, before calling // get the new index entries @@ -59,10 +61,12 @@ public interface IndexCodec { * the current state of the table that needs to an index update Generally, you only care about the latest * column values, for each column you are indexing for each index table. * @param context TODO + * @param regionStartKey TODO + * @param regionEndKey TODO * @return the pairs of (updates,index table name) that should be applied. * @throws IOException */ - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException; + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException; /** * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't @@ -80,5 +84,5 @@ public interface IndexCodec { */ public boolean isEnabled(Mutation m) throws IOException; - public void initialize(Configuration conf, byte[] startKey, byte[] endKey, byte[] tableName); + public void initialize(Configuration conf, byte[] tableName); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index 4f65416..8a069f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -136,7 +136,7 @@ public class LocalTableState implements TableState { * state for any of the columns you are indexing. * <p> * <i>NOTE:</i> This method should <b>not</b> be used during - * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been + * {@link IndexCodec#getIndexDeletes(TableState, BatchState, byte[], byte[])} as the pending update will not yet have been * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> * need to track the indexed columns. * <p> http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 97ac30d..820a475 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -186,7 +186,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { throws IOException { // get the index updates for this current batch - Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData); + Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey()); state.resetTrackedColumns(); /* @@ -224,7 +224,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { } /** - * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the + * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData, byte[], byte[])} and then add them to the * update map. * <p> * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc). @@ -234,7 +234,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { */ protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData) throws IOException { - Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData); + Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey()); if (cleanup != null) { for (IndexUpdate d : cleanup) { if (!d.isValid()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 140c304..d1da5f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -101,7 +101,7 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -184,9 +184,24 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, PhoenixConnection connection) { List<PTable> indexes = dataTable.getIndexes(); - serialize(dataTable, ptr, indexes, connection); + serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection); } + public static void serializeServerMaintainedIndexes(PTable dataTable, ImmutableBytesWritable ptr, + List<PTable> indexes, PhoenixConnection connection) { + Iterator<PTable> indexesItr = Collections.emptyListIterator(); + boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional(); + if (onlyLocalIndexes) { + if (!dataTable.isTransactional() + || !dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) { + indexesItr = maintainedLocalIndexes(indexes.iterator()); + } + } else { + indexesItr = maintainedIndexes(indexes.iterator()); + } + + serialize(dataTable, ptr, Lists.newArrayList(indexesItr), connection); + } /** * For client-side to serialize all IndexMaintainers for a given table * @param dataTable data table @@ -195,22 +210,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, List<PTable> indexes, PhoenixConnection connection) { - Iterator<PTable> indexesItr; - boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional(); - if (onlyLocalIndexes) { - indexesItr = maintainedLocalIndexes(indexes.iterator()); - } else { - indexesItr = maintainedIndexes(indexes.iterator()); - } - if (!indexesItr.hasNext()) { + if (indexes.isEmpty()) { ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); return; } - int nIndexes = 0; - while (indexesItr.hasNext()) { - nIndexes++; - indexesItr.next(); - } + int nIndexes = indexes.size(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(stream); try { @@ -218,11 +222,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1)); // Write out data row key schema once, since it's the same for all index maintainers dataTable.getRowKeySchema().write(output); - indexesItr = onlyLocalIndexes - ? maintainedLocalIndexes(indexes.iterator()) - : maintainedIndexes(indexes.iterator()); - while (indexesItr.hasNext()) { - org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); + for (PTable index : indexes) { + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(index.getIndexMaintainer(dataTable, connection)); byte[] protoBytes = proto.toByteArray(); WritableUtils.writeVInt(output, protoBytes.length); output.write(protoBytes); @@ -285,31 +286,33 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) { - ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); - DataInput input = new DataInputStream(stream); List<IndexMaintainer> maintainers = Collections.emptyList(); - try { - int size = WritableUtils.readVInt(input); - boolean isDataTableSalted = size < 0; - size = Math.abs(size); - RowKeySchema rowKeySchema = new RowKeySchema(); - rowKeySchema.readFields(input); - maintainers = Lists.newArrayListWithExpectedSize(size); - for (int i = 0; i < size; i++) { - if (useProtoForIndexMaintainer) { - int protoSize = WritableUtils.readVInt(input); - byte[] b = new byte[protoSize]; - input.readFully(b); - org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); - maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); - } else { - IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); - maintainer.readFields(input); - maintainers.add(maintainer); + if (length > 0) { + ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); + DataInput input = new DataInputStream(stream); + try { + int size = WritableUtils.readVInt(input); + boolean isDataTableSalted = size < 0; + size = Math.abs(size); + RowKeySchema rowKeySchema = new RowKeySchema(); + rowKeySchema.readFields(input); + maintainers = Lists.newArrayListWithExpectedSize(size); + for (int i = 0; i < size; i++) { + if (useProtoForIndexMaintainer) { + int protoSize = WritableUtils.readVInt(input); + byte[] b = new byte[protoSize]; + input.readFully(b); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); + maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); + } else { + IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); + maintainer.readFields(input); + maintainers.add(maintainer); + } } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible } - } catch (IOException e) { - throw new RuntimeException(e); // Impossible } return maintainers; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index d33e3fe..eb911e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -36,8 +36,8 @@ import com.google.common.collect.Sets; /** * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index ( - * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be ( - * {@link #getIndexUpserts(TableState, IndexMetaData)}). + * {@link #getIndexDeletes(TableState, IndexMetaData, byte[], byte[])}) as well as what the new index state should be ( + * {@link #getIndexUpserts(TableState, IndexMetaData, byte[], byte[])}). */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; @@ -46,23 +46,19 @@ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MAINTAINERS = "IndexMaintainers"; public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; - private byte[] regionStartKey; - private byte[] regionEndKey; private byte[] tableName; public PhoenixIndexCodec() { } - public PhoenixIndexCodec(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) { - initialize(conf, regionStartKey, regionEndKey, tableName); + public PhoenixIndexCodec(Configuration conf, byte[] tableName) { + initialize(conf, tableName); } @Override - public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) { - this.regionStartKey = regionStartKey; - this.regionEndKey = regionEndKey; + public void initialize(Configuration conf, byte[] tableName) { this.tableName = tableName; } @@ -74,7 +70,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException { PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context; List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers(); if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { @@ -97,7 +93,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException { PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context; List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 56db39b..3543da8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -94,7 +94,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e; Configuration conf = e.getConfiguration(); String serverName = env.getRegionServerServices().getServerName().getServerName(); - codec = new PhoenixIndexCodec(conf, env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), env.getRegionInfo().getTable().getName()); + codec = new PhoenixIndexCodec(conf, env.getRegionInfo().getTable().getName()); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); // setup the actual index writer // For transactional tables, we keep the index active upon a write failure http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 9a31238..f02e9d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -38,6 +38,7 @@ import java.util.Map; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.tuple.ResultTuple; @@ -89,7 +90,11 @@ public class ScanningResultIterator implements ResultIterator { private void getScanMetrics() { if (!scanMetricsUpdated && scanMetricsEnabled) { - Map<String, Long> scanMetricsMap = scan.getScanMetrics().getMetricsMap(); + ScanMetrics scanMetrics = scan.getScanMetrics(); + if (scanMetrics == null) { + return; + } + Map<String, Long> scanMetricsMap = scanMetrics.getMetricsMap(); if(scanMetricsMap == null) { return; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index e6b94fb..cc99ae4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -37,8 +37,8 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; import org.apache.hadoop.hbase.client.AbstractClientScanner; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; @@ -51,7 +51,6 @@ import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; @@ -74,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting; */ public class TableResultIterator implements ResultIterator { private final Scan scan; - private final HTableInterface htable; + private final Table htable; private final ScanMetricsHolder scanMetricsHolder; private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR; private final long renewLeaseThreshold; @@ -187,13 +186,13 @@ public class TableResultIterator implements ResultIterator { newScan.setStartRow(ByteUtil.nextKey(startRowSuffix)); } } - plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); + plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getName().getName()); logger.debug( "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); if (retry <= 0) { throw e1; } - Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + Long cacheId = e1.getCacheId(); retry--; try { ServerCache cache = caches == null ? null : http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java index 59b26b2..1230f25 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java @@ -24,10 +24,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * Writes mutations directly to HBase using HBase front-door APIs. */ @@ -62,7 +68,24 @@ public class DirectHTableWriter { public void write(List<Mutation> mutations) throws IOException, InterruptedException { Object[] results = new Object[mutations.size()]; - table.batch(mutations, results); + String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + if (txnIdStr == null) { + table.batch(mutations, results); + } else { + long ts = Long.parseLong(txnIdStr); + PhoenixTransactionProvider provider = TransactionFactory.Provider.getDefault().getTransactionProvider(); + String txnProviderStr = conf.get(PhoenixConfigurationUtil.TX_PROVIDER); + if (txnProviderStr != null) { + provider = TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider(); + } + List<Mutation> shadowedMutations = Lists.newArrayListWithExpectedSize(mutations.size()); + for (Mutation m : mutations) { + if (m instanceof Put) { + shadowedMutations.add(provider.markPutAsCommitted((Put)m, ts, ts)); + } + } + table.batch(shadowedMutations, results); + } } protected Configuration getConf() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 15d41ea..f2f4cdb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -381,6 +381,7 @@ public class IndexTool extends Configured implements Tool { if (pdataTable.isTransactional()) { configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); + configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pdataTable.getTransactionProvider().name()); } configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(maxTimeRange)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 9e0d629..353de7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -21,26 +21,37 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; import org.apache.phoenix.mapreduce.PhoenixJobCounters; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * Mapper that hands over rows from data table to the index table. * @@ -93,6 +104,18 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + PhoenixTransactionProvider provider = null; + Configuration conf = context.getConfiguration(); + long ts = HConstants.LATEST_TIMESTAMP; + String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + if (txnIdStr != null) { + ts = Long.parseLong(txnIdStr); + provider = TransactionFactory.Provider.getDefault().getTransactionProvider(); + String txnProviderStr = conf.get(PhoenixConfigurationUtil.TX_PROVIDER); + if (txnProviderStr != null) { + provider = TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider(); + } + } try { final ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); final List<Object> values = record.getValues(); @@ -100,16 +123,30 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD indxWritable.write(this.pStatement); this.pStatement.execute(); - final Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true); - while (uncommittedDataIterator.hasNext()) { - Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); - if (Bytes.compareTo(Bytes.toBytes(indexTableName), kvPair.getFirst()) != 0) { + PhoenixConnection pconn = connection.unwrap(PhoenixConnection.class); + final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(true); + while (iterator.hasNext()) { + Pair<byte[], List<Mutation>> pair = iterator.next(); + if (Bytes.compareTo(Bytes.toBytes(indexTableName), pair.getFirst()) != 0) { // skip edits for other tables continue; } - List<KeyValue> keyValueList = kvPair.getSecond(); - keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); - for (KeyValue kv : keyValueList) { + List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row + for (Mutation mutation : pair.getSecond()) { + if (mutation instanceof Put) { + if (provider != null) { + mutation = provider.markPutAsCommitted((Put)mutation, ts, ts); + } + for (List<Cell> cellList : mutation.getFamilyCellMap().values()) { + List<KeyValue>keyValueList = preUpdateProcessor.preUpsert(mutation.getRow(), KeyValueUtil.ensureKeyValues(cellList)); + for (KeyValue keyValue : keyValueList) { + keyValues.add(keyValue); + } + } + } + } + Collections.sort(keyValues, pconn.getKeyValueBuilder().getKeyValueComparator()); + for (KeyValue kv : keyValues) { outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); context.write(outputKey, kv); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 3b63f66..2f552ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -105,6 +105,8 @@ public final class PhoenixConfigurationUtil { public static final String TX_SCN_VALUE = "phoenix.mr.txscn.value"; + public static final String TX_PROVIDER = "phoenix.mr.txprovider"; + /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */ public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 0820232..d5a5199 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -168,7 +168,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public QueryLoggerDisruptor getQueryDisruptor(); - public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider); + public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider) throws SQLException; /** * Writes a cell to SYSTEM.MUTEX using checkAndPut to ensure only a single client can execute a @@ -183,4 +183,4 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated */ public void deleteMutexCell(String tenantId, String schemaName, String tableName, String columnName, String familyName) throws SQLException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index dbfd461..ab4678a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -147,7 +147,6 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.coprocessor.SequenceRegionObserver; import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; -import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; @@ -237,6 +236,7 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.transaction.PhoenixTransactionClient; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.ByteUtil; @@ -860,10 +860,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } - // For ALTER TABLE - boolean nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); - boolean isTransactional = - Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || nonTxToTx; + TransactionFactory.Provider provider = getTransactionProvider(tableProps); + boolean isTransactional = (provider != null); // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -926,22 +924,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (isTransactional) { - TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps); - if (provider == null) { - String providerValue = this.props.get(QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER); - provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(providerValue); - } Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); if (!descriptor.hasCoprocessor(coprocessorClass.getName())) { descriptor.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null); } + Class<? extends RegionObserver> coprocessorGCClass = provider.getTransactionProvider().getGCCoprocessor(); + if (coprocessorGCClass != null) { + if (!descriptor.hasCoprocessor(coprocessorGCClass.getName())) { + descriptor.addCoprocessor(coprocessorGCClass.getName(), null, priority - 10, null); + } + } } else { // Remove all potential transactional coprocessors - for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { - Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + for (TransactionFactory.Provider aprovider : TransactionFactory.Provider.values()) { + Class<? extends RegionObserver> coprocessorClass = aprovider.getTransactionProvider().getCoprocessor(); + Class<? extends RegionObserver> coprocessorGCClass = aprovider.getTransactionProvider().getGCCoprocessor(); if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) { descriptor.removeCoprocessor(coprocessorClass.getName()); } + if (coprocessorGCClass != null && descriptor.hasCoprocessor(coprocessorGCClass.getName())) { + descriptor.removeCoprocessor(coprocessorGCClass.getName()); + } } } } catch (IOException e) { @@ -949,6 +952,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + private TransactionFactory.Provider getTransactionProvider(Map<String,Object> tableProps) { + TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps); + return provider; + } + private static interface RetriableOperation { boolean checkForCompletion() throws TimeoutException, IOException; String getOperationName(); @@ -1171,15 +1179,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!modifyExistingMetaData) { return existingDesc; // Caller already knows that no metadata was changed } - boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name())); + TransactionFactory.Provider provider = getTransactionProvider(props); + boolean willBeTx = provider != null; // If mapping an existing table as transactional, set property so that existing // data is correctly read. if (willBeTx) { - newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString()); + if (!equalTxCoprocessor(provider, existingDesc, newDesc)) { + // Cannot switch between different providers + if (hasTxCoprocessor(existingDesc)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SWITCH_TXN_PROVIDERS) + .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) + .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); + } + if (provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL) + .setMessage(provider.name()) + .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) + .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); + } + newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString()); + } } else { // If we think we're creating a non transactional table when it's already // transactional, don't allow. - if (existingDesc.hasCoprocessor(TephraTransactionalProcessor.class.getName())) { + if (hasTxCoprocessor(existingDesc)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX) .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); @@ -1212,6 +1235,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return null; // will never make it here } + + private static boolean hasTxCoprocessor(HTableDescriptor descriptor) { + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) { + return true; + } + } + return false; + } + + private static boolean equalTxCoprocessor(TransactionFactory.Provider provider, HTableDescriptor existingDesc, HTableDescriptor newDesc) { + Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + return (coprocessorClass != null && existingDesc.hasCoprocessor(coprocessorClass.getName()) && newDesc.hasCoprocessor(coprocessorClass.getName())); +} private void modifyTable(byte[] tableName, HTableDescriptor newDesc, boolean shouldPoll) throws IOException, InterruptedException, TimeoutException, SQLException { @@ -1936,6 +1974,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { indexTableProps = Maps.newHashMapWithExpectedSize(1); indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.valueOf(txValue)); + indexTableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, tableProps.get(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER)); } for (PTable index : table.getIndexes()) { HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes()); @@ -2046,6 +2085,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean willBeTransactional = false; boolean isOrWillBeTransactional = isTransactional; Integer newTTL = null; + TransactionFactory.Provider txProvider = null; for (String family : properties.keySet()) { List<Pair<String, Object>> propsList = properties.get(family); if (propsList != null && propsList.size() > 0) { @@ -2056,20 +2096,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if ((MetaDataUtil.isHTableProperty(propName) || TableProperty.isPhoenixTableProperty(propName)) && addingColumns) { // setting HTable and PhoenixTable properties while adding a column is not allowed. throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN) - .setMessage("Property: " + propName).build() + .setMessage("Property: " + propName) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() .buildException(); } if (MetaDataUtil.isHTableProperty(propName)) { // Can't have a column family name for a property that's an HTableProperty if (!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY) - .setMessage("Column Family: " + family + ", Property: " + propName).build() + .setMessage("Column Family: " + family + ", Property: " + propName) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() .buildException(); } tableProps.put(propName, propValue); } else { if (TableProperty.isPhoenixTableProperty(propName)) { - TableProperty.valueOf(propName).validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType()); + TableProperty tableProp = TableProperty.valueOf(propName); + tableProp.validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType()); if (propName.equals(TTL)) { newTTL = ((Number)prop.getSecond()).intValue(); // Even though TTL is really a HColumnProperty we treat it specially. @@ -2078,6 +2125,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) { willBeTransactional = isOrWillBeTransactional = true; tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue); + } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER) && propValue != null) { + willBeTransactional = isOrWillBeTransactional = true; + tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE); + txProvider = (Provider)TableProperty.TRANSACTION_PROVIDER.getValue(propValue); + tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, txProvider); } } else { if (MetaDataUtil.isHColumnProperty(propName)) { @@ -2091,12 +2143,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // FIXME: This isn't getting triggered as currently a property gets evaluated // as HTableProp if its neither HColumnProp or PhoenixTableProp. throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY) - .setMessage("Column Family: " + family + ", Property: " + propName).build() + .setMessage("Column Family: " + family + ", Property: " + propName) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() .buildException(); } } } } + if (isOrWillBeTransactional && newTTL != null) { + TransactionFactory.Provider isOrWillBeTransactionProvider = txProvider == null ? table.getTransactionProvider() : txProvider; + if (isOrWillBeTransactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) { + throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode()) + .setMessage(isOrWillBeTransactionProvider.name()) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() + .buildException(); + } + } if (!colFamilyPropsMap.isEmpty()) { stmtFamiliesPropsMap.put(family, colFamilyPropsMap); } @@ -4728,7 +4794,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) { + public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) throws SQLException { PhoenixTransactionClient client = txClients[provider.ordinal()]; if (client == null) { client = txClients[provider.ordinal()] = provider.getTransactionProvider().getTransactionClient(config, connectionInfo); http://git-wip-us.apache.org/repos/asf/phoenix/blob/fd9af912/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 147e873..4be4af8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -367,7 +367,7 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public PhoenixTransactionClient initTransactionClient(Provider provider) { + public PhoenixTransactionClient initTransactionClient(Provider provider) throws SQLException { return getDelegate().initTransactionClient(provider); }
