Repository: phoenix
Updated Branches:
  refs/heads/txn 0ad5c5675 -> e1521ab05


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
new file mode 100644
index 0000000..dd71a58
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.hbase.index.MultiMutation;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Do all the work of managing index updates for a transactional table from a 
single coprocessor. Since the transaction
+ * manager essentially time orders writes through conflict detection, the 
logic to maintain a secondary index is quite a
+ * bit simpler than the non transactional case. For example, there's no need 
to muck with the WAL, as failure scenarios
+ * are handled by aborting the transaction.
+ */
+public class PhoenixTransactionalIndexer extends BaseRegionObserver {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixTransactionalIndexer.class);
+
+    private PhoenixIndexCodec codec;
+    private IndexWriter writer;
+    private boolean stopped;
+
+    @Override
+    public void start(CoprocessorEnvironment e) throws IOException {
+        final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment)e;
+        String serverName = 
env.getRegionServerServices().getServerName().getServerName();
+        codec = new PhoenixIndexCodec();
+        codec.initialize(env);
+
+        // setup the actual index writer
+        this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment e) throws IOException {
+        if (this.stopped) { return; }
+        this.stopped = true;
+        String msg = "TxIndexer is being stopped";
+        this.writer.stop(msg);
+    }
+
+    @Override
+    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
+
+        Mutation m = miniBatchOp.getOperation(0);
+        if (!codec.isEnabled(m)) {
+            super.preBatchMutate(c, miniBatchOp);
+            return;
+        }
+
+        Collection<Pair<Mutation, byte[]>> indexUpdates = null;
+        // get the current span, or just use a null-span to avoid a bunch of 
if statements
+        try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+
+            // get the index updates for all elements in this batch
+            indexUpdates = getIndexUpdates(c.getEnvironment(), miniBatchOp);
+
+            current.addTimelineAnnotation("Built index updates, doing 
preStep");
+            TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
+
+            // no index updates, so we are done
+            if (!indexUpdates.isEmpty()) {
+                this.writer.write(indexUpdates);
+            }
+        } catch (Throwable t) {
+            LOG.error("Failed to update index with entries:" + indexUpdates, 
t);
+            IndexManagementUtil.rethrowIndexingException(t);
+        }
+    }
+
+    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        // Collect the set of mutable ColumnReferences so that we can first
+        // run a scan to get the current state. We'll need this to delete
+        // the existing index rows.
+        Map<String,byte[]> updateAttributes = 
miniBatchOp.getOperation(0).getAttributesMap();
+        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(env,updateAttributes);
+        Transaction tx = indexMetaData.getTransaction();
+        assert(tx != null);
+        List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
+        Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            if (!indexMaintainer.isImmutableRows()) {
+                mutableColumns.addAll(indexMaintainer.getAllColumns());
+            }
+        }
+        ResultScanner scanner = null;
+        TransactionAwareHTable txTable = null;
+        
+        // Collect up all mutations in batch
+        Map<ImmutableBytesPtr, MultiMutation> mutations =
+                new HashMap<ImmutableBytesPtr, MultiMutation>();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            // add the mutation to the batch set
+            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+            MultiMutation stored = mutations.get(row);
+            // we haven't seen this row before, so add it
+            if (stored == null) {
+                stored = new MultiMutation(row);
+                mutations.put(row, stored);
+            }
+            stored.addAll(m);
+        }
+        
+        try {
+            if (!mutableColumns.isEmpty()) {
+                List<KeyRange> keys = 
Lists.newArrayListWithExpectedSize(mutations.size());
+                for (ImmutableBytesPtr ptr : mutations.keySet()) {
+                    
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
+                }
+                Scan scan = new Scan();
+                ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+                scanRanges.initializeScan(scan);
+                scan.setFilter(scanRanges.getSkipScanFilter());
+                TableName tableName = 
env.getRegion().getRegionInfo().getTable();
+                HTableInterface htable = env.getTable(tableName);
+                txTable = new TransactionAwareHTable(htable);
+                txTable.startTx(tx);
+                scanner = txTable.getScanner(scan);
+            }
+        } finally {
+            if (txTable != null) txTable.close();
+        }
+        
+        Collection<Pair<Mutation, byte[]>> indexUpdates = new 
ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * 
indexMaintainers.size());
+        if (scanner == null) {
+            for (Mutation m : mutations.values()) {
+                TxTableState state = new TxTableState(env, mutableColumns, 
updateAttributes, tx.getWritePointer(), m);
+                state.applyMutation(m);
+                Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, 
indexMetaData);
+                for (IndexUpdate update : updates) {
+                    indexUpdates.add(new Pair<Mutation, 
byte[]>(update.getUpdate(),update.getTableName()));
+                }
+            }
+        } else {
+            Result result;
+            while ((result = scanner.next()) != null) {
+                TxTableState state = new TxTableState(env, mutableColumns, 
updateAttributes, tx.getWritePointer(), result);
+                Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
+                for (IndexUpdate delete : deletes) {
+                    indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
+                }
+                Mutation m = mutations.get(new 
ImmutableBytesPtr(result.getRow()));
+                state.applyMutation(m);
+                Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, 
indexMetaData);
+                for (IndexUpdate update : updates) {
+                    indexUpdates.add(new Pair<Mutation, 
byte[]>(update.getUpdate(),update.getTableName()));
+                }
+            }
+        }
+        return indexUpdates;
+    }
+
+
+    private static class TxTableState implements TableState {
+        private final byte[] rowKey;
+        private final long currentTimestamp;
+        private final RegionCoprocessorEnvironment env;
+        private final Map<String, byte[]> attributes;
+        private final List<Cell> pendingUpdates;
+        private final Set<ColumnReference> indexedColumns;
+        private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
+        
+        private TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, byte[] rowKey) {
+            this.env = env;
+            this.currentTimestamp = currentTimestamp;
+            this.indexedColumns = indexedColumns;
+            this.attributes = attributes;
+            this.rowKey = rowKey;
+            int estimatedSize = indexedColumns.size();
+            this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
+            this.pendingUpdates = 
Lists.newArrayListWithExpectedSize(estimatedSize);
+        }
+        
+        public TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, Mutation m) {
+            this(env, indexedColumns, attributes, currentTimestamp, 
m.getRow());
+            applyMutation(m);
+        }
+        
+        public TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, Result r) {
+            this(env, indexedColumns, attributes, currentTimestamp, 
r.getRow());
+
+            for (ColumnReference ref : indexedColumns) {
+                Cell cell = r.getColumnLatestCell(ref.getFamily(), 
ref.getQualifier());
+                if (cell != null) {
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+                    valueMap.put(ref, ptr);
+                }
+            }
+        }
+        
+        @Override
+        public RegionCoprocessorEnvironment getEnvironment() {
+            return env;
+        }
+
+        @Override
+        public long getCurrentTimestamp() {
+            return currentTimestamp;
+        }
+
+        @Override
+        public Map<String, byte[]> getUpdateAttributes() {
+            return attributes;
+        }
+
+        @Override
+        public byte[] getCurrentRowKey() {
+            return rowKey;
+        }
+
+        @Override
+        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+            return Collections.emptyList();
+        }
+
+        public void applyMutation(Mutation m) {
+            if (m instanceof Delete) {
+                valueMap.clear();
+            } else {
+                CellScanner scanner = m.cellScanner();
+                try {
+                    while (scanner.advance()) {
+                        Cell cell = scanner.current();
+                        if (cell.getTypeByte() == 
KeyValue.Type.DeleteColumn.getCode()) {
+                            ColumnReference ref = new 
ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
+                            valueMap.remove(ref);
+                        } else if (cell.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()) {
+                            for (ColumnReference ref : indexedColumns) {
+                                if (ref.matchesFamily(cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength())) {
+                                    valueMap.remove(ref);
+                                }
+                            }
+                        } else {
+                            ColumnReference ref = new 
ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
+                            ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
+                            ptr.set(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+                            valueMap.put(ref, ptr);
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e); // Impossible
+                }
+            }
+        }
+        
+        @Override
+        public Collection<Cell> getPendingUpdate() {
+            return pendingUpdates;
+        }
+
+        @Override
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? 
extends ColumnReference> indexedColumns)
+                throws IOException {
+            // TODO: creating these objects over and over again is wasteful
+            ColumnTracker tracker = new ColumnTracker(indexedColumns);
+            ValueGetter getter = new ValueGetter() {
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference 
ref) throws IOException {
+                    return valueMap.get(ref);
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return rowKey;
+                }
+                
+            };
+            Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, 
IndexUpdate>(getter, new IndexUpdate(tracker));
+            return pair;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
deleted file mode 100644
index c471df6..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.util.IndexUtil;
-
-public class PhoenixTxIndexBuilder extends TxIndexBuilder {
-    @Override
-    public void setup(RegionCoprocessorEnvironment env) throws IOException {
-        super.setup(env);
-        Configuration conf = env.getConfiguration();
-        // Install failure policy that just re-throws exception instead of 
killing RS
-        // or disabling the index
-        conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, 
PhoenixTxIndexFailurePolicy.class.getName());
-    }
-
-    @Override
-    public void batchStarted(MiniBatchOperationInProgress<Mutation> 
miniBatchOp) throws IOException {
-        // The entire purpose of this method impl is to get the existing rows 
for the
-        // table rows being indexed into the block cache, as the index 
maintenance code
-        // does a point scan per row.
-        // TODO: provide a means for the transactional case to just return the 
Scanner
-        // for when this is executed as it seems like that would be more 
efficient.
-        IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), 
getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
-    }
-
-    private PhoenixIndexCodec getCodec() {
-        return (PhoenixIndexCodec)this.codec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
deleted file mode 100644
index fa70cc9..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
-
-import com.google.common.collect.Multimap;
-
-public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy {
-    private Stoppable parent;
-
-    @Override
-    public void stop(String why) {
-        if (parent != null) {
-            parent.stop(why);
-        }
-    }
-
-    @Override
-    public boolean isStopped() {
-        return parent == null ? false : parent.isStopped();
-    }
-
-    @Override
-    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
-        this.parent = parent;
-    }
-
-    @Override
-    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause)
-            throws IOException {
-        if (cause instanceof IOException) {
-            throw (IOException)cause;
-        } else if (cause instanceof RuntimeException) { throw 
(RuntimeException)cause; }
-        throw new IOException(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dae7939..20ab602 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -112,7 +112,7 @@ import 
org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.index.PhoenixTxIndexBuilder;
+import org.apache.phoenix.index.PhoenixTransactionalIndexer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -705,11 +705,18 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             // all-or-none mutate class which break when this coprocessor is 
installed (PHOENIX-1318).
             if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
                     && !SchemaUtil.isMetaTable(tableName)
-                    && !SchemaUtil.isStatsTable(tableName)
-                    && !descriptor.hasCoprocessor(Indexer.class.getName())) {
-                Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
-                opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
-                Indexer.enableIndexing(descriptor, isTransactional ? 
PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority);
+                    && !SchemaUtil.isStatsTable(tableName)) {
+                if (isTransactional) {
+                    if 
(!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+                        
descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, 
priority, null);
+                    }
+                } else {
+                    if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+                        Map<String, String> opts = 
Maps.newHashMapWithExpectedSize(1);
+                        opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
+                        Indexer.enableIndexing(descriptor, 
PhoenixIndexBuilder.class, opts, priority);
+                    }
+                }
             }
             if (SchemaUtil.isStatsTable(tableName) && 
!descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
                 
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b317d77..8c2e990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
@@ -238,7 +239,8 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + 
MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
             // Install split policy to prevent a tenant's metadata from being 
split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_STATS_TABLE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_STATS_TABLE + "\"(\n" +
@@ -259,7 +261,8 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_STAT_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + 
MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
             // Install split policy to prevent a physical table's stats from 
being split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_SEQUENCE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + 
"\"(\n" +
@@ -277,5 +280,6 @@ public interface QueryConstants {
             LIMIT_REACHED_FLAG + " BOOLEAN \n" +
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
             HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
-            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + 
MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + 
MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java 
b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..ee06179 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
  */
 package org.apache.phoenix.trace;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,16 @@ import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.QueryUtil;
 
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 
 /**
  * Write the metrics to a phoenix table.
@@ -169,7 +176,9 @@ public class PhoenixMetricsSink implements MetricsSink {
                         TAG_COUNT + " smallint, " +
                         ANNOTATION_COUNT + " smallint" +
                         "  CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + 
", "
-                        + PARENT.columnName + ", " + SPAN.columnName + "))\n";
+                        + PARENT.columnName + ", " + SPAN.columnName + "))\n" +
+                        PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + 
Boolean.FALSE;
+;
         PreparedStatement stmt = conn.prepareStatement(ddl);
         stmt.execute();
         this.table = table;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 1fcf16a..45729a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -24,8 +24,6 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -43,15 +41,12 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexStatementRewriter;
-import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -65,16 +60,13 @@ import 
org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -644,58 +636,4 @@ public class IndexUtil {
         return col.getExpressionStr() == null ? 
IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
                 : col.getExpressionStr();
     }
-    
-    /*
-     * The entire purpose of this method is to get the existing rows for the 
table rows being indexed into the
-     * block cache, as the index maintenance code does a point scan per row. 
Though for the transactional
-     * case we may be loading more than we need, since we're not applying the 
transaction filters, that
-     * should still be ok.
-     */
-    public static void loadMutatingRowsIntoBlockCache(HRegion region, 
PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, 
boolean useRawScan) 
-    throws IOException {
-        List<KeyRange> keys = 
Lists.newArrayListWithExpectedSize(miniBatchOp.size());
-        Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
-                new HashMap<ImmutableBytesWritable, IndexMaintainer>();
-        ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            Mutation m = miniBatchOp.getOperation(i);
-            keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
-            List<IndexMaintainer> indexMaintainers = 
codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
-            
-            for(IndexMaintainer indexMaintainer: indexMaintainers) {
-                if (indexMaintainer.isImmutableRows() && 
indexMaintainer.isLocalIndex()) continue;
-                indexTableName.set(indexMaintainer.getIndexTableName());
-                if (maintainers.get(indexTableName) != null) continue;
-                maintainers.put(indexTableName, indexMaintainer);
-            }
-            
-        }
-        if (maintainers.isEmpty()) return;
-        Scan scan = IndexManagementUtil.newLocalStateScan(new 
ArrayList<IndexMaintainer>(maintainers.values()));
-        scan.setRaw(useRawScan);
-        ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-        scanRanges.initializeScan(scan);
-        scan.setFilter(scanRanges.getSkipScanFilter());
-        RegionScanner scanner = region.getScanner(scan);
-        // Run through the scanner using internal nextRaw method
-        region.startRegionOperation();
-        try {
-            synchronized (scanner) {
-                boolean hasMore;
-                do {
-                    List<Cell> results = Lists.newArrayList();
-                    // Results are potentially returned even when the return 
value of s.next is
-                    // false since this is an indication of whether or not 
there are more values
-                    // after the ones returned
-                    hasMore = scanner.nextRaw(results);
-                } while (hasMore);
-            }
-        } finally {
-            try {
-                scanner.close();
-            } finally {
-                region.closeRegionOperation();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index 7e73a81..0575b3f 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -16,7 +16,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.index.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 
 /**
  * An {@link IndexCodec} for testing that allow you to specify the index 
updates/deletes, regardless of the current
@@ -41,12 +41,12 @@ public class CoveredIndexCodecForTesting extends 
BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) {
         return this.deletes;
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) {
         return this.updates;
     }
 
@@ -59,7 +59,4 @@ public class CoveredIndexCodecForTesting extends 
BaseIndexCodec {
     public boolean isEnabled(Mutation m) {
         return true;
     }
-
-    @Override
-    public void setContext(TableState state, Mutation mutation) throws 
IOException {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index f0c1483..e4654ea 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
@@ -180,14 +181,14 @@ public class TestCoveredColumnIndexCodec {
 
     // check the codec for deletes it should send
     LocalTableState state = new LocalTableState(env, table, p);
-    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state);
+    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, 
IndexMetaData.NULL_INDEX_META_DATA);
     assertFalse("Found index updates without any existing kvs in table!", 
updates.iterator().next()
         .isValid());
 
     // get the updates with the pending update
     state.setCurrentTimestamp(1);
     state.addPendingUpdates(kvs);
-    updates = codec.getIndexUpserts(state);
+    updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
     assertTrue("Didn't find index updates for pending primary table update!", 
updates.iterator()
         .hasNext());
     for (IndexUpdate update : updates) {
@@ -210,7 +211,7 @@ public class TestCoveredColumnIndexCodec {
     state = new LocalTableState(env, table, d);
     state.setCurrentTimestamp(2);
     // check the cleanup of the current table, after the puts (mocking a 
'next' update)
-    updates = codec.getIndexDeletes(state);
+    updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
     for (IndexUpdate update : updates) {
       assertTrue("Didn't have any index cleanup, even though there is current 
state",
         update.isValid());
@@ -240,7 +241,7 @@ public class TestCoveredColumnIndexCodec {
     state.setCurrentTimestamp(d.getTimeStamp());
     // now we shouldn't see anything when getting the index update
     state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY));
-    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, 
IndexMetaData.NULL_INDEX_META_DATA);
     for (IndexUpdate update : updates) {
       assertFalse("Had some index updates, though it should have been covered 
by the delete",
         update.isValid());

Reply via email to