Repository: phoenix Updated Branches: refs/heads/txn d905a6662 -> c7700b41d
Support rollback across tables with indexes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c7700b41 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c7700b41 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c7700b41 Branch: refs/heads/txn Commit: c7700b41d156049228e521da5f964f4c56c905b9 Parents: d905a66 Author: James Taylor <[email protected]> Authored: Wed May 20 01:43:48 2015 -0700 Committer: James Taylor <[email protected]> Committed: Wed May 20 01:43:48 2015 -0700 ---------------------------------------------------------------------- .../end2end/index/BaseMutableIndexIT.java | 1 + .../end2end/index/TxGlobalMutableIndexIT.java | 43 +++ .../execute/DelegateHTableInterface.java | 282 +++++++++++++++++++ .../apache/phoenix/execute/MutationState.java | 161 ++++++++--- .../apache/phoenix/index/IndexMaintainer.java | 30 ++ .../phoenix/index/IndexMetaDataCacheClient.java | 5 +- .../phoenix/index/PhoenixIndexMetaData.java | 8 +- .../index/PhoenixTransactionalIndexer.java | 109 +++++-- .../apache/phoenix/jdbc/PhoenixStatement.java | 8 +- .../apache/phoenix/schema/DelegateTable.java | 4 +- .../java/org/apache/phoenix/schema/PTable.java | 2 +- .../org/apache/phoenix/schema/PTableImpl.java | 3 +- .../java/org/apache/phoenix/util/IndexUtil.java | 20 +- .../java/org/apache/phoenix/util/ScanUtil.java | 6 +- pom.xml | 45 --- 15 files changed, 612 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java index c6aadca..2676548 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java @@ -1171,4 +1171,5 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT { conn.close(); } } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java index a2e0412..5196b0a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java @@ -17,12 +17,24 @@ */ package org.apache.phoenix.end2end.index; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; import java.util.Map; +import java.util.Properties; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; +import org.junit.Test; import com.google.common.collect.Maps; @@ -39,4 +51,35 @@ public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT { props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + + @Test + public void testRollbackOfUncommittedIndexChange() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR)"); + stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)"); + + stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); + + //assert values in data table + ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert values in data table + rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); + assertFalse(rs.next()); + + } finally { + conn.close(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java new file mode 100644 index 0000000..07bdcc8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class DelegateHTableInterface implements HTableInterface { + protected final HTableInterface delegate; + + public DelegateHTableInterface(HTableInterface delegate) { + this.delegate = delegate; + } + + @Override + public byte[] getTableName() { + return delegate.getTableName(); + } + + @Override + public TableName getName() { + return delegate.getName(); + } + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return delegate.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return delegate.exists(get); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return delegate.exists(gets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + delegate.batch(actions, results); + } + + @SuppressWarnings("deprecation") + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + return delegate.batch(actions); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback) + throws IOException, InterruptedException { + delegate.batchCallback(actions, results, callback); + } + + @SuppressWarnings("deprecation") + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R> callback) throws IOException, + InterruptedException { + return delegate.batchCallback(actions, callback); + } + + @Override + public Result get(Get get) throws IOException { + return delegate.get(get); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + return delegate.get(gets); + } + + @SuppressWarnings("deprecation") + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return delegate.getRowOrBefore(row, family); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return delegate.getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return delegate.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return delegate.getScanner(family, qualifier); + } + + @Override + public void put(Put put) throws IOException { + delegate.put(put); + } + + @Override + public void put(List<Put> puts) throws IOException { + delegate.put(puts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + return delegate.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public void delete(Delete delete) throws IOException { + delegate.delete(delete); + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + delegate.delete(deletes); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) + throws IOException { + return delegate.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + delegate.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return delegate.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return delegate.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @SuppressWarnings("deprecation") + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public boolean isAutoFlush() { + return delegate.isAutoFlush(); + } + + @Override + public void flushCommits() throws IOException { + delegate.flushCommits(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return delegate.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + return delegate.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable { + delegate.coprocessorService(service, startKey, endKey, callable, callback); + } + + @SuppressWarnings("deprecation") + @Override + public void setAutoFlush(boolean autoFlush) { + delegate.setAutoFlush(autoFlush); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + delegate.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + delegate.setAutoFlushTo(autoFlush); + } + + @Override + public long getWriteBufferSize() { + return delegate.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + delegate.setWriteBufferSize(writeBufferSize); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService(MethodDescriptor methodDescriptor, + Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + return delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); + } + + @Override + public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) throws ServiceException, + Throwable { + delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, + RowMutations mutation) throws IOException { + return delegate.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index b7c7850..4f1a2cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -36,8 +36,10 @@ import co.cask.tephra.TransactionSystemClient; import co.cask.tephra.hbase98.TransactionAwareHTable; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; @@ -58,6 +60,8 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; @@ -90,7 +94,7 @@ public class MutationState implements SQLCloseable { private PhoenixConnection connection; private final long maxSize; - private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); + private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); // map from table to rows // rows - map from rowkey to columns // columns - map from column to value @@ -428,11 +432,76 @@ public class MutationState implements SQLCloseable { } } + private boolean hasKeyValueColumn(PTable table, PTable index) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + return !maintainer.getAllColumns().isEmpty(); + } + + private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) { + while (enabledImmutableIndexes.hasNext()) { + PTable index = enabledImmutableIndexes.next(); + if (index.getIndexType() != IndexType.LOCAL) { + if (hasKeyValueColumn(table, index)) { + keyValueIndexes.add(index); + } else { + rowKeyIndexes.add(index); + } + } + } + } + private class MetaDataAwareHTable extends DelegateHTableInterface { + private final TableRef tableRef; + + private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { + super(delegate); + this.tableRef = tableRef; + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + try { + PTable table = tableRef.getTable(); + List<PTable> indexes = table.getIndexes(); + Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator()); + if (enabledIndexes.hasNext()) { + List<PTable> keyValueIndexes = Collections.emptyList(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection); + if (table.isImmutableRows()) { + List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); + // Generate index deletes for immutable indexes that only reference row key + // columns and submit directly here. + for (PTable index : rowKeyIndexes) { + List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, tempPtr, connection.getKeyValueBuilder(), connection); + HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); + hindex.delete(indexDeletes); + } + } + + // If we have mutable indexes, local immutable indexes, or global immutable indexes + // that reference key value columns, setup index meta data and attach here. In this + // case updates to the indexes will be generated on the server side. + if (!keyValueIndexes.isEmpty()) { + attachMetaData = true; + IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); + } + if (attachMetaData) { + setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); + } + } + delegate.delete(deletes); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + @SuppressWarnings("deprecation") private void send(Iterator<TableRef> tableRefIterator) throws SQLException { int i = 0; long[] serverTimeStamps = null; - byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); // Validate up front if not transactional so that we if (tableRefIterator == null) { serverTimeStamps = validateAll(); @@ -449,8 +518,7 @@ public class MutationState implements SQLCloseable { continue; } PTable table = tableRef.getTable(); - table.getIndexMaintainers(tempPtr, connection); - boolean hasIndexMaintainers = tempPtr.getLength() > 0; + boolean hasIndexMaintainers = table.getIndexMaintainers(tempPtr, connection); boolean isDataTable = true; // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; @@ -469,43 +537,23 @@ public class MutationState implements SQLCloseable { do { ServerCache cache = null; if (hasIndexMaintainers && isDataTable) { - byte[] attribValue = null; - byte[] uuidValue; - byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; - if (table.isTransactional()) { - txState = TransactionUtil.encodeTxnState(getTransaction()); - } - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, tempPtr, txState); - child.addTimelineAnnotation("Updated index metadata cache"); - uuidValue = cache.getId(); - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = true; - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr); - uuidValue = ServerCacheClient.generateId(); - } - // Either set the UUID to be able to access the index metadata from the cache - // or set the index metadata directly on the Mutation - for (Mutation mutation : mutations) { - if (tenantId != null) { - mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); - } - mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - } + cache = setMetaDataOnMutations(tableRef, mutations, tempPtr); } + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = cache != null; SQLException sqlE = null; HTableInterface hTable = connection.getQueryServices().getTable(htableName); try { if (table.isTransactional()) { + // If we have indexes, wrap the HTable in a delegate HTable that + // will attach the necessary index meta data in the event of a + // rollback + if (!table.getIndexes().isEmpty()) { + hTable = new MetaDataAwareHTable(hTable, tableRef); + } TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable); // Don't add immutable indexes (those are the only ones that would participate // during a commit), as we don't need conflict detection for these. @@ -580,6 +628,40 @@ public class MutationState implements SQLCloseable { assert(numRows==0); assert(this.mutations.isEmpty()); } + + private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, + ImmutableBytesWritable indexMetaDataPtr) throws SQLException { + PTable table = tableRef.getTable(); + byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); + ServerCache cache = null; + byte[] attribValue = null; + byte[] uuidValue; + byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; + if (table.isTransactional()) { + txState = TransactionUtil.encodeTxnState(getTransaction()); + } + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); + uuidValue = cache.getId(); + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + uuidValue = ServerCacheClient.generateId(); + } + // Either set the UUID to be able to access the index metadata from the cache + // or set the index metadata directly on the Mutation + for (Mutation mutation : mutations) { + if (tenantId != null) { + mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + if (attribValue != null) { + mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + } + return cache; + } public void clear() throws SQLException { this.mutations.clear(); @@ -649,7 +731,14 @@ public class MutationState implements SQLCloseable { // We really should be keying the tables based on the physical table name. List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size()); while (filteredTableRefs.hasNext()) { - strippedAliases.add(new TableRef(filteredTableRefs.next(), null)); + /* + * We'll have a PROJECTED table here, but we need the TABLE instead as otherwise we can't + * get the cf:cq which we need for IndexMaintainer. + */ + TableRef tableRef = filteredTableRefs.next(); + PTable projectedTable = tableRef.getTable(); + PTable nonProjectedTable = connection.getMetaDataCache().getTable(new PTableKey(projectedTable.getTenantId(), projectedTable.getName().getString())); + strippedAliases.add(new TableRef(null, nonProjectedTable, tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); } startTransaction(); send(strippedAliases.iterator()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 1be0aa3..be668d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -194,6 +194,35 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ptr.set(stream.getBuffer(), 0, stream.size()); } + + public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr, + List<PTable> keyValueIndexes, PhoenixConnection connection) { + int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); + int nIndexes = nMutableIndexes + keyValueIndexes.size(); + int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer + for (PTable index : keyValueIndexes) { + estimatedSize += index.getIndexMaintainer(table, connection).getEstimatedByteSize(); + } + TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1); + DataOutput output = new DataOutputStream(stream); + try { + // Encode data table salting in sign of number of indexes + WritableUtils.writeVInt(output, nIndexes * (table.getBucketNum() == null ? 1 : -1)); + // Serialize current mutable indexes, subtracting the vint size from the length + // as its still included + if (indexMetaDataPtr.getLength() > 0) { + output.write(indexMetaDataPtr.get(), indexMetaDataPtr.getOffset(), indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes)); + } + // Serialize mutable indexes afterwards + for (PTable index : keyValueIndexes) { + index.getIndexMaintainer(table, connection).write(output); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + indexMetaDataPtr.set(stream.getBuffer(), 0, stream.size()); + } + public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr, KeyValueBuilder builder) { return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength()); @@ -264,6 +293,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); + assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index c1135bc..1b0c599 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ScanRanges; @@ -59,7 +58,7 @@ public class IndexMetaDataCacheClient { * @param mutations the list of mutations that will be sent in a batch to server * @param indexMetaDataByteLength length in bytes of the index metadata cache */ - public static boolean useIndexMetadataCache(PhoenixConnection connection, List<Mutation> mutations, int indexMetaDataByteLength) { + public static boolean useIndexMetadataCache(PhoenixConnection connection, List<? extends Mutation> mutations, int indexMetaDataByteLength) { ReadOnlyProps props = connection.getQueryServices().getProps(); int threshold = props.getInt(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD); return (indexMetaDataByteLength > ServerCacheClient.UUID_LENGTH && mutations.size() > threshold); @@ -73,7 +72,7 @@ public class IndexMetaDataCacheClient { * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException { + public ServerCache addIndexMetadataCache(List<? extends Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException { /** * Serialize and compress hashCacheTable */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 26c1c12..ff0eb14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -40,6 +40,7 @@ import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; public class PhoenixIndexMetaData implements IndexMetaData { + private final Map<String, byte[]> attributes; private final IndexMetaDataCache indexMetaDataCache; private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException { @@ -84,7 +85,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { } public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException { - indexMetaDataCache = getIndexMetaData(env, attributes); + this.indexMetaDataCache = getIndexMetaData(env, attributes); + this.attributes = attributes; } public Transaction getTransaction() { @@ -94,4 +96,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { public List<IndexMaintainer> getIndexMaintainers() { return indexMetaDataCache.getIndexMaintainers(); } + + public Map<String, byte[]> getAttributes() { + return attributes; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 6f1e28c..862c4ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -14,11 +14,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import co.cask.tephra.Transaction; +import co.cask.tephra.TxConstants; import co.cask.tephra.hbase98.TransactionAwareHTable; import org.apache.commons.logging.Log; @@ -28,6 +30,8 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -40,6 +44,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.hbase.index.MultiMutation; @@ -64,6 +69,7 @@ import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -101,16 +107,39 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { this.writer.stop(msg); } + private static Iterator<Mutation> getMutationIterator(final MiniBatchOperationInProgress<Mutation> miniBatchOp) { + return new Iterator<Mutation>() { + private int i = 0; + + @Override + public boolean hasNext() { + return i < miniBatchOp.size(); + } + + @Override + public Mutation next() { + return miniBatchOp.getOperation(i++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { Mutation m = miniBatchOp.getOperation(0); - if (!codec.isEnabled(m)) { + if (!codec.isEnabled(m) || m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { super.preBatchMutate(c, miniBatchOp); return; } + Map<String,byte[]> updateAttributes = m.getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); Collection<Pair<Mutation, byte[]>> indexUpdates = null; // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { @@ -120,7 +149,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } // get the index updates for all elements in this batch - indexUpdates = getIndexUpdates(c.getEnvironment(), miniBatchOp); + indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), false); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); @@ -159,29 +188,41 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return s; } - private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - // Collect the set of mutable ColumnReferences so that we can first - // run a scan to get the current state. We'll need this to delete - // the existing index rows. - Map<String,byte[]> updateAttributes = miniBatchOp.getOperation(0).getAttributesMap(); - PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(env,updateAttributes); - Transaction tx = indexMetaData.getTransaction(); - assert(tx != null); - List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); - Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); - for (IndexMaintainer indexMaintainer : indexMaintainers) { - if (!indexMaintainer.isImmutableRows()) { - mutableColumns.addAll(indexMaintainer.getAllColumns()); + @Override + public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + + // Need to do this in preDelete as otherwise our scan won't see the old values unless + // we do a raw scan. + if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null || !codec.isEnabled(delete)) { + super.preDelete(e, delete, edit, durability); + return; + } + Map<String,byte[]> updateAttributes = delete.getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes); + Collection<Pair<Mutation, byte[]>> indexUpdates = null; + try { + indexUpdates = getIndexUpdates(e.getEnvironment(), indexMetaData, Iterators.<Mutation>singletonIterator(delete), true); + // no index updates, so we are done + if (!indexUpdates.isEmpty()) { + this.writer.write(indexUpdates); } + } catch (Throwable t) { + String msg = "Failed to rollback index updates: " + indexUpdates; + LOG.error(msg, t); + ServerUtil.throwIOException(msg, t); } + } + + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) throws IOException { ResultScanner scanner = null; TransactionAwareHTable txTable = null; // Collect up all mutations in batch Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); - for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); + while(mutationIterator.hasNext()) { + Mutation m = mutationIterator.next(); // add the mutation to the batch set ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); MultiMutation stored = mutations.get(row); @@ -193,6 +234,19 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { stored.addAll(m); } + // Collect the set of mutable ColumnReferences so that we can first + // run a scan to get the current state. We'll need this to delete + // the existing index rows. + Transaction tx = indexMetaData.getTransaction(); + assert(tx != null); + List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); + for (IndexMaintainer indexMaintainer : indexMaintainers) { + if (!indexMaintainer.isImmutableRows()) { + mutableColumns.addAll(indexMaintainer.getAllColumns()); + } + } + Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); try { if (!mutableColumns.isEmpty()) { @@ -201,7 +255,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); } Scan scan = new Scan(); - scan.setAttribute(TX_NO_READ_OWN_WRITES, PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this + if (!readOwnWrites) { + scan.setAttribute(TX_NO_READ_OWN_WRITES, PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this + } // Project all mutable columns for (ColumnReference ref : mutableColumns) { scan.addColumn(ref.getFamily(), ref.getQualifier()); @@ -217,14 +273,17 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { txTable.startTx(tx); scanner = txTable.getScanner(scan); } + ColumnReference emptyColRef = new ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); if (scanner != null) { Result result; while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); - TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m, result); + byte[] attribValue = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); for (IndexUpdate delete : deletes) { if (delete.isValid()) { + delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); } } @@ -238,7 +297,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); state.applyMutation(); Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); for (IndexUpdate put : puts) { @@ -284,7 +343,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } - public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, Result r) { + public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) { this(env, indexedColumns, attributes, currentTimestamp, m); for (ColumnReference ref : indexedColumns) { @@ -295,6 +354,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { valueMap.put(ref, ptr); } } + /* + Cell cell = r.getColumnLatestCell(emptyColRef.getFamily(), emptyColRef.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(emptyColRef, ptr); + } + */ } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index ef2a233..19beecd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -33,6 +33,7 @@ import java.text.Format; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.client.Scan; @@ -134,6 +135,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; /** @@ -236,7 +238,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); // Send mutations to hbase, so they are visible to subsequent reads. // Use original plan for data table so that data and immutable indexes will be sent - boolean isTransactional = connection.getMutationState().startTransaction(plan.getContext().getResolver().getTables().iterator()); + // TODO: for joins, we need to iterate through all tables, but we need the original table, + // not the projected table, so plan.getContext().getResolver().getTables() won't work. + TableRef tableRef = plan.getTableRef(); + Iterator<TableRef> tableRefs = tableRef == null ? Iterators.<TableRef>emptyIterator() : Iterators.singletonIterator(tableRef); + boolean isTransactional = connection.getMutationState().startTransaction(tableRefs); plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan); if (isTransactional) { // After optimize so that we have the right context object http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 89c2283..98f9f4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -162,8 +162,8 @@ public class DelegateTable implements PTable { } @Override - public void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) { - delegate.getIndexMaintainers(ptr, connection); + public boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) { + return delegate.getIndexMaintainers(ptr, connection); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 6b7f8c6..4b4729f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -307,7 +307,7 @@ public interface PTable { PName getPhysicalName(); boolean isImmutableRows(); - void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection); + boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection); IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection); PName getDefaultFamilyName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 90c68fb..a32e922 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -830,7 +830,7 @@ public class PTableImpl implements PTable { } @Override - public synchronized void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) { + public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) { if (indexMaintainersPtr == null) { indexMaintainersPtr = new ImmutableBytesWritable(); if (indexes.isEmpty()) { @@ -840,6 +840,7 @@ public class PTableImpl implements PTable { } } ptr.set(indexMaintainersPtr.get(), indexMaintainersPtr.getOffset(), indexMaintainersPtr.getLength()); + return indexMaintainersPtr.getLength() > 0; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 45729a2..fd111e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -209,13 +210,30 @@ public class IndexUtil { .getLength()) == 0); } + public static List<Delete> generateDeleteIndexData(final PTable table, PTable index, + List<Delete> dataMutations, ImmutableBytesWritable ptr, final KeyValueBuilder kvBuilder, PhoenixConnection connection) + throws SQLException { + try { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + List<Delete> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size()); + for (final Mutation dataMutation : dataMutations) { + long ts = MetaDataUtil.getClientTimeStamp(dataMutation); + ptr.set(dataMutation.getRow()); + indexMutations.add(maintainer.buildDeleteMutation(kvBuilder, ptr, ts)); + } + return indexMutations; + } catch (IOException e) { + throw new SQLException(e); + } + } + public static List<Mutation> generateIndexData(final PTable table, PTable index, List<Mutation> dataMutations, ImmutableBytesWritable ptr, final KeyValueBuilder kvBuilder, PhoenixConnection connection) throws SQLException { try { IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size()); - for (final Mutation dataMutation : dataMutations) { + for (final Mutation dataMutation : dataMutations) { long ts = MetaDataUtil.getClientTimeStamp(dataMutation); ptr.set(dataMutation.getRow()); /* http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 2dfa573..3786e6d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -45,12 +45,12 @@ import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; -import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PVarbinary; import com.google.common.collect.Lists; @@ -434,7 +434,7 @@ public class ScanUtil { } } - public static ScanRanges newScanRanges(List<Mutation> mutations) throws SQLException { + public static ScanRanges newScanRanges(List<? extends Mutation> mutations) throws SQLException { List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); for (Mutation m : mutations) { keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 90eba1d..9ace160 100644 --- a/pom.xml +++ b/pom.xml @@ -116,7 +116,6 @@ <maven-dependency-plugin.version>2.1</maven-dependency-plugin.version> <maven.assembly.version>2.5.2</maven.assembly.version> - <maven.rat.version>0.8</maven.rat.version> <!-- Plugin options --> <numForkedUT>3</numForkedUT> @@ -174,38 +173,6 @@ <artifactId>maven-assembly-plugin</artifactId> <version>${maven.assembly.version}</version> </plugin> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <version>${maven.rat.version}</version> - <configuration> - <excludes> - <exclude>CHANGES</exclude> - <exclude>README.md</exclude> - <exclude>README</exclude> - <exclude>dev/phoenix.importorder</exclude> - <exclude>dev/release_files/**</exclude> - <exclude>**/target/**</exclude> - <exclude>**/*.versionsBackup</exclude> - <!-- exclude docs --> - <exclude>docs/**</exclude> - <!-- exclude examples --> - <exclude>examples/**</exclude> - <!-- exclude source control files --> - <exclude>.gitignore</exclude> - <exclude>.git/**</exclude> - <!-- exclude IDE files --> - <exclude>**/.idea/**</exclude> - <exclude>**/*.iml</exclude> - <exclude>.project</exclude> - <exclude>.classpath</exclude> - <exclude>.settings/**</exclude> - <exclude>**/resources/java.sql.Driver</exclude> - <!-- exclude protobuf files --> - <exclude>**/generated/**</exclude> - </excludes> - </configuration> - </plugin> <!-- We put slow-running tests into src/it and run them during the integration-test phase using the failsafe plugin. This way developers can run unit tests conveniently from the IDE or via @@ -661,18 +628,6 @@ <id>release</id> <build> <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> </plugins> </build> </profile>
