http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java
 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java
new file mode 100644
index 0000000..43763cf
--- /dev/null
+++ 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.indexer.BlurIndexCounter;
+import org.apache.blur.indexer.IndexerJobDriver;
+import org.apache.blur.indexer.MergeSortRowIdMatcher;
+import org.apache.blur.indexer.MergeSortRowIdMatcher.Action;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.store.Directory;
+
+import com.google.common.io.Closer;
+
+public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, 
BooleanWritable> {
+
+  public static final String BLUR_CACHE_DIR_TOTAL_BYTES = 
"blur.cache.dir.total.bytes";
+  private Counter _rowIds;
+  private Counter _rowIdsToUpdate;
+
+  private MergeSortRowIdMatcher _matcher;
+  private int _numberOfShardsInTable;
+  private Configuration _configuration;
+  private String _snapshot;
+  private Path _tablePath;
+  private Counter _rowIdsFromIndex;
+  private long _totalNumberOfBytes;
+  private Action _action;
+  private Closer _closer;
+  private Path _cachePath;
+  private String _table;
+  private Writer _writer;
+
+  @Override
+  protected void setup(Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context) throws IOException,
+      InterruptedException {
+    _configuration = context.getConfiguration();
+    _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
+    _rowIdsToUpdate = 
context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
+    _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
+    TableDescriptor tableDescriptor = 
BlurOutputFormat.getTableDescriptor(_configuration);
+    _numberOfShardsInTable = tableDescriptor.getShardCount();
+    _tablePath = new Path(tableDescriptor.getTableUri());
+    _snapshot = ExistingDataIndexLookupMapper.getSnapshot(_configuration);
+    _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 
128 * 1024 * 1024);
+    _cachePath = BlurInputFormat.getLocalCachePath(_configuration);
+    _table = tableDescriptor.getName();
+    _closer = Closer.create();
+  }
+
+  @Override
+  protected void reduce(Text rowId, Iterable<NullWritable> nothing,
+      Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) 
throws IOException, InterruptedException {
+    if (_matcher == null) {
+      _matcher = getMergeSortRowIdMatcher(rowId, context);
+    }
+    if (_writer == null) {
+      _writer = getRowIdWriter(rowId, context);
+    }
+    _writer.append(rowId, NullWritable.get());
+    _rowIds.increment(1);
+    if (_action == null) {
+      _action = new Action() {
+        @Override
+        public void found(Text rowId) throws IOException {
+          _rowIdsToUpdate.increment(1);
+          try {
+            context.write(rowId, new BooleanWritable(true));
+          } catch (InterruptedException e) {
+            throw new IOException(e);
+          }
+        }
+      };
+    }
+    _matcher.lookup(rowId, _action);
+  }
+
+  private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context)
+      throws IOException {
+    BlurPartitioner blurPartitioner = new BlurPartitioner();
+    int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+    String shardName = ShardUtil.getShardName(shard);
+    Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, 
shardName);
+    Configuration configuration = context.getConfiguration();
+    String uuid = configuration.get(IndexerJobDriver.BLUR_UPDATE_ID);
+    Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context));
+    return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, 
tmpPath));
+  }
+
+  private String getAttemptString(Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context) {
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    return taskAttemptID.toString();
+  }
+
+  @Override
+  protected void cleanup(Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context) throws IOException,
+      InterruptedException {
+    _closer.close();
+  }
+
+  private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId,
+      Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) 
throws IOException {
+    BlurPartitioner blurPartitioner = new BlurPartitioner();
+    int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+    String shardName = ShardUtil.getShardName(shard);
+
+    Path shardPath = new Path(_tablePath, shardName);
+    HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
+    SnapshotIndexDeletionPolicy policy = new 
SnapshotIndexDeletionPolicy(_configuration,
+        SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
+    Long generation = policy.getGeneration(_snapshot);
+    if (generation == null) {
+      hdfsDirectory.close();
+      throw new IOException("Snapshot [" + _snapshot + "] not found in shard 
[" + shardPath + "]");
+    }
+
+    BlurConfiguration bc = new BlurConfiguration();
+    BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new 
BlockCacheDirectoryFactoryV2(bc,
+        _totalNumberOfBytes);
+    _closer.register(blockCacheDirectoryFactoryV2);
+    Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", 
"shard", hdfsDirectory, null);
+    List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
+    IndexCommit indexCommit = 
ExistingDataIndexLookupMapper.findIndexCommit(listCommits, generation, 
shardPath);
+    DirectoryReader reader = DirectoryReader.open(indexCommit);
+    _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader));
+
+    Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, 
shardName);
+    return new MergeSortRowIdMatcher(dir, generation, _configuration, 
cachePath, context);
+  }
+
+  private long getTotalNumberOfRowIds(DirectoryReader reader) throws 
IOException {
+    long total = 0;
+    List<AtomicReaderContext> leaves = reader.leaves();
+    for (AtomicReaderContext context : leaves) {
+      AtomicReader atomicReader = context.reader();
+      Terms terms = atomicReader.terms(BlurConstants.ROW_ID);
+      long expectedInsertions = terms.size();
+      if (expectedInsertions < 0) {
+        return -1;
+      }
+      total += expectedInsertions;
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java
 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java
new file mode 100644
index 0000000..c5ea87a
--- /dev/null
+++ 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.blur.indexer.BlurIndexCounter;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.update.IndexKey;
+import org.apache.blur.mapreduce.lib.update.IndexValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class NewDataMapper extends Mapper<Text, BlurRecord, IndexKey, 
IndexValue> {
+
+  private static final IndexValue EMPTY_RECORD = new IndexValue();
+  private long _timestamp;
+  private Counter _newRecords;
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    InputSplit inputSplit = context.getInputSplit();
+    FileSplit fileSplit = getFileSplit(inputSplit);
+    Path path = fileSplit.getPath();
+    FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
+    FileStatus fileStatus = fileSystem.getFileStatus(path);
+    _timestamp = fileStatus.getModificationTime();
+    _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS);
+  }
+
+  private FileSplit getFileSplit(InputSplit inputSplit) throws IOException {
+    if (inputSplit instanceof FileSplit) {
+      return (FileSplit) inputSplit;
+    }
+    if 
(inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit"))
 {
+      try {
+        Field declaredField = 
inputSplit.getClass().getDeclaredField("inputSplit");
+        declaredField.setAccessible(true);
+        return getFileSplit((InputSplit) declaredField.get(inputSplit));
+      } catch (NoSuchFieldException e) {
+        throw new IOException(e);
+      } catch (SecurityException e) {
+        throw new IOException(e);
+      } catch (IllegalArgumentException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      }
+    } else {
+      throw new IOException("Unknown input split type [" + inputSplit + "] [" 
+ inputSplit.getClass() + "]");
+    }
+  }
+
+  @Override
+  protected void map(Text key, BlurRecord blurRecord, Context context) throws 
IOException, InterruptedException {
+    IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), 
blurRecord.getRecordId(), _timestamp);
+    context.write(newDataKey, new IndexValue(blurRecord));
+    _newRecords.increment(1L);
+
+    IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId());
+    context.write(newDataMarker, EMPTY_RECORD);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java
 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java
new file mode 100644
index 0000000..bb957f5
--- /dev/null
+++ 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.blur.indexer.InputSplitPruneUtil;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+public class PrunedBlurInputFormat extends BlurInputFormat {
+
+  private static final Log LOG = 
LogFactory.getLog(PrunedBlurInputFormat.class);
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    Path[] dirs = getInputPaths(context);
+    Configuration configuration = context.getConfiguration();
+    List<BlurInputSplit> splits = getSplits(configuration, dirs);
+    Map<Path, List<BlurInputSplit>> splitMap = new TreeMap<Path, 
List<BlurInputSplit>>();
+    for (BlurInputSplit split : splits) {
+      Path path = split.getDir();
+      String table = split.getTable().toString();
+      int shard = InputSplitPruneUtil.getShardFromDirectoryPath(path);
+      long rowIdUpdateFromNewDataCount = 
InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
+          table, shard);
+      long indexCount = 
InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, 
shard);
+      if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
+        LOG.debug("Pruning id lookup input path [" + path + "] no overlapping 
ids.");
+      } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, 
table, shard)) {
+        LOG.debug("Pruning blur input path [" + split.getDir() + "]");
+      } else {
+        LOG.debug("Keeping blur input path [" + split.getDir() + "]");
+        List<BlurInputSplit> list = splitMap.get(path);
+        if (list == null) {
+          splitMap.put(path, list = new ArrayList<BlurInputSplit>());
+        }
+        list.add(split);
+      }
+    }
+    List<InputSplit> result = new ArrayList<InputSplit>();
+    for (List<BlurInputSplit> lst : splitMap.values()) {
+      BlurInputSplitColletion blurInputSplitColletion = new 
BlurInputSplitColletion();
+      for (BlurInputSplit blurInputSplit : lst) {
+        blurInputSplitColletion.add(blurInputSplit);
+      }
+      result.add(blurInputSplitColletion);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java
 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java
new file mode 100644
index 0000000..49095d0
--- /dev/null
+++ 
b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.indexer.InputSplitPruneUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import com.google.common.base.Splitter;
+
+public class PrunedSequenceFileInputFormat<K, V> extends 
SequenceFileInputFormat<K, V> {
+
+  private static final Log LOG = 
LogFactory.getLog(PrunedSequenceFileInputFormat.class);
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    List<InputSplit> splits = super.getSplits(job);
+    List<InputSplit> results = new ArrayList<InputSplit>();
+    Configuration configuration = job.getConfiguration();
+    String table = InputSplitPruneUtil.getTable(configuration);
+    for (InputSplit inputSplit : splits) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      Path path = fileSplit.getPath();
+      LOG.debug("Getting shard index from path [" + path + "]");
+      String name = path.getName();
+      int shard = getShardIndex(name);
+      long rowIdUpdateFromNewDataCount = 
InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
+          table, shard);
+      long indexCount = 
InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, 
shard);
+      if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
+        LOG.debug("Pruning id lookup input path [" + path + "] no overlapping 
ids.");
+      } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, 
table, shard)) {
+        LOG.debug("Keeping id lookup input path [" + path + "]");
+        results.add(inputSplit);
+      } else {
+        LOG.debug("Pruning id lookup input path [" + path + "]");
+      }
+    }
+    return results;
+  }
+
+  private int getShardIndex(String name) {
+    // based on file format of "part-r-00000", etc
+    Iterable<String> split = Splitter.on('-').split(name);
+    List<String> parts = new ArrayList<String>();
+    for (String s : split) {
+      parts.add(s);
+    }
+    return Integer.parseInt(parts.get(2));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
deleted file mode 100644
index 590ba83..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-public enum BlurIndexCounter {
-
-  NEW_RECORDS, ROW_IDS_FROM_INDEX, ROW_IDS_TO_UPDATE_FROM_NEW_DATA, 
ROW_IDS_FROM_NEW_DATA,
-
-  INPUT_FORMAT_MAPPER, INPUT_FORMAT_EXISTING_RECORDS,
-
-  LOOKUP_MAPPER, LOOKUP_MAPPER_EXISTING_RECORDS, 
LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
deleted file mode 100644
index f56b731..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.mapreduce.lib.BlurInputFormat;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.xml.DOMConfigurator;
-
-public class ClusterDriver extends Configured implements Tool {
-
-  private static final String BLUR_ENV = "blur.env";
-  private static final Log LOG = LogFactory.getLog(ClusterDriver.class);
-  private static final String _SEP = "_";
-  private static final String IMPORT = "import";
-
-  public static void main(String[] args) throws Exception {
-    String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
-    System.out.println("Log file path [" + logFilePath + "]");
-    System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
-    URL url = ClusterDriver.class.getResource("/program-log4j.xml");
-    if (url != null) {
-      LOG.info("Reseting log4j config from classpath resource [{0}]", url);
-      LogManager.resetConfiguration();
-      DOMConfigurator.configure(url);
-    }
-    int res = ToolRunner.run(new Configuration(), new ClusterDriver(), args);
-    System.exit(res);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    int c = 0;
-    final String blurEnv = args[c++];
-    final String blurZkConnection = args[c++];
-    final String extraConfig = args[c++];
-    final int reducerMultiplier = Integer.parseInt(args[c++]);
-    final Configuration conf = getConf();
-
-    final ExecutorService service = Executors.newCachedThreadPool();
-    final AtomicBoolean running = new AtomicBoolean();
-    running.set(true);
-
-    // Load configs for all filesystems.
-    Path path = new Path(extraConfig);
-    Configuration mergeHdfsConfigs = 
HdfsConfigurationNamespaceMerge.mergeHdfsConfigs(path.getFileSystem(conf), 
path);
-    conf.addResource(mergeHdfsConfigs);
-    conf.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, blurZkConnection);
-    conf.set(BLUR_ENV, blurEnv);
-
-    final Iface client = 
BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
-
-    stopAllExistingMRJobs(blurEnv, conf);
-    cleanUpOldImportDirs(client, conf);
-    moveInprogressDirsBackToNew(client, conf);
-    unlockLockedTables(client);
-
-    Map<String, Future<Void>> futures = new HashMap<String, Future<Void>>();
-    while (running.get()) {
-      LOG.debug("Starting index update check for blur cluster [" + 
blurZkConnection + "].");
-      try {
-        List<String> tableList = client.tableList();
-        startMissingIndexerThreads(tableList, service, futures, 
blurZkConnection, conf, client, reducerMultiplier);
-      } catch (TException t) {
-        LOG.error("Unknown Blur Thrift Error, Retrying...", t);
-      }
-      Thread.sleep(TimeUnit.SECONDS.toMillis(10));
-    }
-    return 0;
-  }
-
-  private void unlockLockedTables(Iface client) throws BlurException, 
TException {
-    List<String> tableList = client.tableList();
-    for (String table : tableList) {
-      TableDescriptor tableDescriptor = client.describe(table);
-      if (tableDescriptor.isEnabled()) {
-        unlockLockedTables(client, table);
-      }
-    }
-  }
-
-  private void unlockLockedTables(Iface client, String table) throws 
BlurException, TException {
-    Map<String, List<String>> listSnapshots = client.listSnapshots(table);
-    for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
-      List<String> value = e.getValue();
-      if (value.contains(FasterDriver.MRUPDATE_SNAPSHOT)) {
-        LOG.info("Unlocking table [{0}]", table);
-        client.removeSnapshot(table, FasterDriver.MRUPDATE_SNAPSHOT);
-        return;
-      }
-    }
-  }
-
-  private void moveInprogressDirsBackToNew(Iface client, Configuration conf) 
throws BlurException, TException,
-      IOException {
-    List<String> tableList = client.tableList();
-    for (String table : tableList) {
-      String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
-      Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
-      Path newData = new Path(mrIncWorkingPath, FasterDriver.NEW);
-      Path inprogressData = new Path(mrIncWorkingPath, 
FasterDriver.INPROGRESS);
-      FileSystem fileSystem = inprogressData.getFileSystem(conf);
-      FileStatus[] listStatus = fileSystem.listStatus(inprogressData);
-      for (FileStatus fileStatus : listStatus) {
-        Path src = fileStatus.getPath();
-        Path dst = new Path(newData, src.getName());
-        if (fileSystem.rename(src, dst)) {
-          LOG.info("Moved [{0}] to [{1}] to be reprocessed.", src, dst);
-        } else {
-          LOG.error("Could not move [{0}] to [{1}] to be reprocessed.", src, 
dst);
-        }
-      }
-    }
-  }
-
-  private void cleanUpOldImportDirs(Iface client, Configuration conf) throws 
BlurException, TException, IOException {
-    List<String> tableList = client.tableList();
-    for (String table : tableList) {
-      cleanUpOldImportDirs(client, conf, table);
-    }
-  }
-
-  private void cleanUpOldImportDirs(Iface client, Configuration conf, String 
table) throws BlurException, TException,
-      IOException {
-    TableDescriptor descriptor = client.describe(table);
-    String tableUri = descriptor.getTableUri();
-    Path tablePath = new Path(tableUri);
-    FileSystem fileSystem = tablePath.getFileSystem(getConf());
-    Path importPath = new Path(tablePath, IMPORT);
-    if (fileSystem.exists(importPath)) {
-      for (FileStatus fileStatus : fileSystem.listStatus(importPath)) {
-        Path path = fileStatus.getPath();
-        LOG.info("Removing failed import [{0}]", path);
-        fileSystem.delete(path, true);
-      }
-    }
-  }
-
-  private void stopAllExistingMRJobs(String blurEnv, Configuration conf) 
throws YarnException, IOException,
-      InterruptedException {
-    Cluster cluster = new Cluster(conf);
-    JobStatus[] allJobStatuses = cluster.getAllJobStatuses();
-    for (JobStatus jobStatus : allJobStatuses) {
-      if (jobStatus.isJobComplete()) {
-        continue;
-      }
-      String jobFile = jobStatus.getJobFile();
-      JobID jobID = jobStatus.getJobID();
-      Job job = cluster.getJob(jobID);
-      FileSystem fileSystem = FileSystem.get(job.getConfiguration());
-      Configuration configuration = new Configuration(false);
-      Path path = new Path(jobFile);
-      Path makeQualified = path.makeQualified(fileSystem.getUri(), 
fileSystem.getWorkingDirectory());
-      if (hasReadAccess(fileSystem, makeQualified)) {
-        try (FSDataInputStream in = fileSystem.open(makeQualified)) {
-          configuration.addResource(copy(in));
-        }
-        String jobBlurEnv = configuration.get(BLUR_ENV);
-        LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", 
jobID, jobBlurEnv, blurEnv);
-        if (blurEnv.equals(jobBlurEnv)) {
-          LOG.info("Killing running job [{0}]", jobID);
-          job.killJob();
-        }
-      }
-    }
-  }
-
-  private static InputStream copy(FSDataInputStream input) throws IOException {
-    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
-      IOUtils.copy(input, outputStream);
-      return new ByteArrayInputStream(outputStream.toByteArray());
-    }
-  }
-
-  private static boolean hasReadAccess(FileSystem fileSystem, Path p) {
-    try {
-      fileSystem.access(p, FsAction.READ);
-      return true;
-    } catch (IOException e) {
-      return false;
-    }
-  }
-
-  private Callable<Void> getCallable(final String blurZkConnection, final 
Configuration conf, final Iface client,
-      final String table, final int reducerMultiplier) {
-    return new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        String originalThreadName = Thread.currentThread().getName();
-        try {
-          Thread.currentThread().setName(table);
-          if (!isEnabled(client, table)) {
-            LOG.info("Table [" + table + "] is not enabled.");
-            return null;
-          }
-          waitForDataToLoad(client, table);
-          LOG.debug("Starting index update for table [" + table + "].");
-          final String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, 
table);
-          final String outputPathStr = getOutputPathStr(client, table);
-          Path path = new Path(outputPathStr);
-          FileSystem fileSystem = path.getFileSystem(getConf());
-
-          Configuration configuration = new Configuration(conf);
-          BlurInputFormat.setMaxNumberOfMaps(configuration, 10000);
-
-          FasterDriver driver = new FasterDriver();
-          driver.setConf(configuration);
-          try {
-            driver.run(new String[] { table, mrIncWorkingPathStr, 
outputPathStr, blurZkConnection,
-                Integer.toString(reducerMultiplier) });
-          } finally {
-            if (fileSystem.exists(path)) {
-              fileSystem.delete(path, true);
-            }
-          }
-          return null;
-        } finally {
-          Thread.currentThread().setName(originalThreadName);
-        }
-      }
-    };
-  }
-
-  private void startMissingIndexerThreads(List<String> tableList, 
ExecutorService service,
-      Map<String, Future<Void>> futures, final String blurZkConnection, final 
Configuration conf, final Iface client,
-      int reducerMultiplier) throws BlurException, TException {
-    Set<String> tables = new HashSet<String>(tableList);
-
-    // remove futures that are complete
-    for (String table : tables) {
-      Future<Void> future = futures.get(table);
-      if (future != null) {
-        if (future.isDone()) {
-          try {
-            future.get();
-          } catch (InterruptedException e) {
-            LOG.error("Unknown error while processing table [" + table + "].", 
e);
-          } catch (ExecutionException e) {
-            LOG.error("Unknown error while processing table [" + table + "].", 
e.getCause());
-          }
-          futures.remove(table);
-        } else {
-          LOG.info("Update for table [" + table + "] still running.");
-        }
-      }
-    }
-
-    // start missing tables
-    for (String table : tables) {
-      if (!futures.containsKey(table)) {
-        if (isEnabled(client, table)) {
-          Future<Void> future = service.submit(getCallable(blurZkConnection, 
conf, client, table, reducerMultiplier));
-          futures.put(table, future);
-        }
-      }
-    }
-  }
-
-  public static void waitForDataToLoad(Iface client, String table) throws 
BlurException, TException,
-      InterruptedException {
-    if (isFullyLoaded(client.tableStats(table))) {
-      return;
-    }
-    while (true) {
-      TableStats tableStats = client.tableStats(table);
-      if (isFullyLoaded(tableStats)) {
-        LOG.info("Data load complete in table [" + table + "] [" + tableStats 
+ "]");
-        return;
-      }
-      LOG.info("Waiting for data to load in table [" + table + "] [" + 
tableStats + "]");
-      Thread.sleep(5000);
-    }
-  }
-
-  private static boolean isFullyLoaded(TableStats tableStats) {
-    if (tableStats.getSegmentImportInProgressCount() == 0 && 
tableStats.getSegmentImportPendingCount() == 0) {
-      return true;
-    }
-    return false;
-  }
-
-  private boolean isEnabled(Iface client, String table) throws BlurException, 
TException {
-    TableDescriptor tableDescriptor = client.describe(table);
-    return tableDescriptor.isEnabled();
-  }
-
-  private void mkdirs(FileSystem fileSystem, Path path) throws IOException {
-    if (fileSystem.exists(path)) {
-      return;
-    }
-    LOG.info("Creating path [" + path + "].");
-    if (!fileSystem.mkdirs(path)) {
-      LOG.error("Path [" + path + "] could not be created.");
-    }
-  }
-
-  public static String getMRIncWorkingPathStr(Iface client, String table) 
throws BlurException, TException, IOException {
-    TableDescriptor descriptor = client.describe(table);
-    Map<String, String> tableProperties = descriptor.getTableProperties();
-    if (tableProperties != null) {
-      String workingPath = 
tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
-      if (workingPath != null) {
-        return workingPath;
-      }
-    }
-    throw new IOException("Table [" + table + "] does not have the property ["
-        + BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH + "] setup correctly.");
-  }
-
-  private String getOutputPathStr(Iface client, String table) throws 
BlurException, TException, IOException {
-    TableDescriptor descriptor = client.describe(table);
-    String tableUri = descriptor.getTableUri();
-    Path tablePath = new Path(tableUri);
-    FileSystem fileSystem = tablePath.getFileSystem(getConf());
-    Path importPath = new Path(tablePath, IMPORT);
-    mkdirs(fileSystem, importPath);
-    return new Path(importPath, IMPORT + _SEP + System.currentTimeMillis() + 
_SEP + UUID.randomUUID().toString())
-        .toString();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
deleted file mode 100644
index f43cba5..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.mapreduce.lib.BlurInputFormat;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskReport;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class FasterDriver extends Configured implements Tool {
-
-  public static final String BLUR_UPDATE_ID = "blur.update.id";
-  private static final String BLUR_EXEC_TYPE = "blur.exec.type";
-  public static final String TMP = "tmp";
-
-  public enum EXEC {
-    MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC
-  }
-
-  public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot";
-  public static final String CACHE = "cache";
-  public static final String COMPLETE = "complete";
-  public static final String INPROGRESS = "inprogress";
-  public static final String NEW = "new";
-  private static final Log LOG = LogFactory.getLog(FasterDriver.class);
-
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new FasterDriver(), args);
-    System.exit(res);
-  }
-
-  static class PartitionedInputResult {
-    final Path _partitionedInputData;
-    final Counters _counters;
-    final long[] _rowIdsFromNewData;
-    final long[] _rowIdsToUpdateFromNewData;
-    final long[] _rowIdsFromIndex;
-
-    PartitionedInputResult(Path partitionedInputData, Counters counters, int 
shards, TaskReport[] taskReports) {
-      _partitionedInputData = partitionedInputData;
-      _counters = counters;
-      _rowIdsFromNewData = new long[shards];
-      _rowIdsToUpdateFromNewData = new long[shards];
-      _rowIdsFromIndex = new long[shards];
-      for (TaskReport tr : taskReports) {
-        int id = tr.getTaskID().getId();
-        Counters taskCounters = tr.getTaskCounters();
-        Counter total = 
taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
-        _rowIdsFromNewData[id] = total.getValue();
-        Counter update = 
taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
-        _rowIdsToUpdateFromNewData[id] = update.getValue();
-        Counter index = 
taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
-        _rowIdsFromIndex[id] = index.getValue();
-      }
-    }
-
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    int c = 0;
-    if (args.length < 5) {
-      System.err
-          .println("Usage Driver <table> <mr inc working path> <output path> 
<zk connection> <reducer multipler> <extra config files...>");
-      return 1;
-    }
-    String table = args[c++];
-    String mrIncWorkingPathStr = args[c++];
-    String outputPathStr = args[c++];
-    String blurZkConnection = args[c++];
-    int reducerMultipler = Integer.parseInt(args[c++]);
-    for (; c < args.length; c++) {
-      String externalConfigFileToAdd = args[c];
-      getConf().addResource(new Path(externalConfigFileToAdd));
-    }
-
-    Path outputPath = new Path(outputPathStr);
-    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
-    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
-
-    Path newData = new Path(mrIncWorkingPath, NEW);
-    Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
-    Path completeData = new Path(mrIncWorkingPath, COMPLETE);
-    Path fileCache = new Path(mrIncWorkingPath, CACHE);
-    Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP);
-
-    Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString());
-
-    fileSystem.mkdirs(newData);
-    fileSystem.mkdirs(inprogressData);
-    fileSystem.mkdirs(completeData);
-    fileSystem.mkdirs(fileCache);
-
-    List<Path> srcPathList = new ArrayList<Path>();
-    for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
-      srcPathList.add(fileStatus.getPath());
-    }
-    if (srcPathList.isEmpty()) {
-      return 0;
-    }
-
-    List<Path> inprogressPathList = new ArrayList<Path>();
-    boolean success = false;
-    Iface client = null;
-
-    EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, 
EXEC.AUTOMATIC.name()).toUpperCase());
-
-    String uuid = UUID.randomUUID().toString();
-
-    try {
-      client = 
BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
-      TableDescriptor descriptor = client.describe(table);
-      Map<String, String> tableProperties = descriptor.getTableProperties();
-      String fastDir = tableProperties.get("blur.table.disable.fast.dir");
-      if (fastDir == null || !fastDir.equals("true")) {
-        LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not 
supported in fast MR update.", table);
-        return 1;
-      }
-
-      waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
-      client.createSnapshot(table, MRUPDATE_SNAPSHOT);
-      TableStats tableStats = client.tableStats(table);
-
-      inprogressPathList = movePathList(fileSystem, inprogressData, 
srcPathList);
-
-      switch (exec) {
-      case MR_ONLY:
-        success = runMrOnly(descriptor, inprogressPathList, table, fileCache, 
outputPath, reducerMultipler);
-        break;
-      case MR_WITH_LOOKUP:
-        success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, 
fileCache, outputPath, reducerMultipler,
-            tmpPath, tableStats, MRUPDATE_SNAPSHOT);
-        break;
-      case AUTOMATIC:
-        success = runAutomatic(uuid, descriptor, inprogressPathList, table, 
fileCache, outputPath, reducerMultipler,
-            tmpPath, tableStats, MRUPDATE_SNAPSHOT);
-        break;
-      default:
-        throw new RuntimeException("Exec type [" + exec + "] not supported.");
-      }
-    } finally {
-      if (success) {
-        LOG.info("Associate lookup cache with new data!");
-        associateLookupCache(uuid, fileCache, outputPath);
-        LOG.info("Indexing job succeeded!");
-        client.loadData(table, outputPathStr);
-        LOG.info("Load data called");
-        movePathList(fileSystem, completeData, inprogressPathList);
-        LOG.info("Input data moved to complete");
-        ClusterDriver.waitForDataToLoad(client, table);
-        LOG.info("Data loaded");
-      } else {
-        LOG.error("Indexing job failed!");
-        movePathList(fileSystem, newData, inprogressPathList);
-      }
-      fileSystem.delete(tmpPath, true);
-      if (client != null) {
-        client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
-      }
-    }
-
-    if (success) {
-      return 0;
-    } else {
-      return 1;
-    }
-  }
-
-  private void associateLookupCache(String uuid, Path fileCache, Path 
outputPath) throws IOException {
-    FileSystem fileSystem = fileCache.getFileSystem(getConf());
-    cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache);
-    associateLookupCache(fileSystem, uuid, 
fileSystem.getFileStatus(fileCache), outputPath);
-  }
-
-  private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, 
Path fileCache) throws IOException {
-    FileStatus[] listStatus = fileSystem.listStatus(fileCache);
-    List<FileStatus> uuidPaths = new ArrayList<FileStatus>();
-    for (FileStatus fs : listStatus) {
-      Path path = fs.getPath();
-      if (fs.isDirectory()) {
-        cleanupExtraFileFromSpecX(fileSystem, uuid, path);
-      } else if (path.getName().startsWith(uuid)) {
-        uuidPaths.add(fs);
-      }
-    }
-    if (uuidPaths.size() > 1) {
-      deleteIncomplete(fileSystem, uuidPaths);
-    }
-  }
-
-  private void deleteIncomplete(FileSystem fileSystem, List<FileStatus> 
uuidPaths) throws IOException {
-    long max = 0;
-    FileStatus keeper = null;
-    for (FileStatus fs : uuidPaths) {
-      long len = fs.getLen();
-      if (len > max) {
-        keeper = fs;
-        max = len;
-      }
-    }
-    for (FileStatus fs : uuidPaths) {
-      if (fs != keeper) {
-        LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath());
-        fileSystem.delete(fs.getPath(), false);
-      }
-    }
-  }
-
-  private void associateLookupCache(FileSystem fileSystem, String uuid, 
FileStatus fileCache, Path outputPath)
-      throws IOException {
-    Path path = fileCache.getPath();
-    if (fileCache.isDirectory()) {
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus fs : listStatus) {
-        associateLookupCache(fileSystem, uuid, fs, outputPath);
-      }
-    } else if (path.getName().startsWith(uuid)) {
-      Path parent = path.getParent();
-      String shardName = parent.getName();
-      Path indexPath = findOutputDirPath(outputPath, shardName);
-      LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, 
outputPath);
-      String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), 
indexPath);
-      Path file = new Path(path.getParent(), id + ".seq");
-      MergeSortRowIdMatcher.commitWriter(getConf(), file, path);
-    }
-  }
-
-  private Path findOutputDirPath(Path outputPath, String shardName) throws 
IOException {
-    FileSystem fileSystem = outputPath.getFileSystem(getConf());
-    Path shardPath = new Path(outputPath, shardName);
-    if (!fileSystem.exists(shardPath)) {
-      throw new IOException("Shard path [" + shardPath + "]");
-    }
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new 
PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().endsWith(".commit");          
-      }
-    });
-    if (listStatus.length == 1) {
-      FileStatus fs = listStatus[0];
-      return fs.getPath();
-    } else {
-      throw new IOException("More than one sub dir [" + shardPath + "]");
-    }
-  }
-
-  private boolean runAutomatic(String uuid, TableDescriptor descriptor, 
List<Path> inprogressPathList, String table,
-      Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, 
TableStats tableStats, String snapshot)
-      throws ClassNotFoundException, IOException, InterruptedException {
-    PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, 
descriptor, inprogressPathList, snapshot,
-        fileCache);
-
-    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + 
table + "]");
-
-    InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, 
result._rowIdsFromNewData);
-    InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, 
result._rowIdsToUpdateFromNewData);
-    InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, 
result._rowIdsFromIndex);
-    InputSplitPruneUtil.setTable(job, table);
-
-    BlurInputFormat.setLocalCachePath(job, fileCache);
-
-    // Existing data - This adds the copy data files first open and stream
-    // through all documents.
-    {
-      Path tablePath = new Path(descriptor.getTableUri());
-      BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
-      MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, 
MapperForExistingDataMod.class);
-    }
-
-    // Existing data - This adds the row id lookup
-    {
-      MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
-      FileInputFormat.addInputPath(job, result._partitionedInputData);
-      MultipleInputs.addInputPath(job, result._partitionedInputData, 
PrunedSequenceFileInputFormat.class,
-          MapperForExistingDataWithIndexLookup.class);
-    }
-
-    // New Data
-    for (Path p : inprogressPathList) {
-      FileInputFormat.addInputPath(job, p);
-      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, 
MapperForNewDataMod.class);
-    }
-
-    BlurOutputFormat.setOutputPath(job, outputPath);
-    BlurOutputFormat.setupJob(job, descriptor);
-
-    job.setReducerClass(UpdateReducer.class);
-    job.setMapOutputKeyClass(IndexKey.class);
-    job.setMapOutputValueClass(IndexValue.class);
-    job.setPartitionerClass(IndexKeyPartitioner.class);
-    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
-
-    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
-
-    boolean success = job.waitForCompletion(true);
-    Counters counters = job.getCounters();
-    LOG.info("Counters [" + counters + "]");
-    return success;
-  }
-
-  private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, 
List<Path> inprogressPathList, String table,
-      Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, 
TableStats tableStats, String snapshot)
-      throws ClassNotFoundException, IOException, InterruptedException {
-    PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, 
descriptor, inprogressPathList, snapshot,
-        fileCache);
-
-    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + 
table + "]");
-
-    MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
-    FileInputFormat.addInputPath(job, result._partitionedInputData);
-    MultipleInputs.addInputPath(job, result._partitionedInputData, 
SequenceFileInputFormat.class,
-        MapperForExistingDataWithIndexLookup.class);
-
-    for (Path p : inprogressPathList) {
-      FileInputFormat.addInputPath(job, p);
-      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, 
MapperForNewDataMod.class);
-    }
-
-    BlurOutputFormat.setOutputPath(job, outputPath);
-    BlurOutputFormat.setupJob(job, descriptor);
-
-    job.setReducerClass(UpdateReducer.class);
-    job.setMapOutputKeyClass(IndexKey.class);
-    job.setMapOutputValueClass(IndexValue.class);
-    job.setPartitionerClass(IndexKeyPartitioner.class);
-    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
-
-    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
-
-    boolean success = job.waitForCompletion(true);
-    Counters counters = job.getCounters();
-    LOG.info("Counters [" + counters + "]");
-    return success;
-  }
-
-  private boolean runMrOnly(TableDescriptor descriptor, List<Path> 
inprogressPathList, String table, Path fileCache,
-      Path outputPath, int reducerMultipler) throws IOException, 
ClassNotFoundException, InterruptedException {
-    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + 
table + "]");
-    Path tablePath = new Path(descriptor.getTableUri());
-    BlurInputFormat.setLocalCachePath(job, fileCache);
-    BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
-    MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, 
MapperForExistingDataMod.class);
-
-    for (Path p : inprogressPathList) {
-      FileInputFormat.addInputPath(job, p);
-      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, 
MapperForNewDataMod.class);
-    }
-
-    BlurOutputFormat.setOutputPath(job, outputPath);
-    BlurOutputFormat.setupJob(job, descriptor);
-
-    job.setReducerClass(UpdateReducer.class);
-    job.setMapOutputKeyClass(IndexKey.class);
-    job.setMapOutputValueClass(IndexValue.class);
-    job.setPartitionerClass(IndexKeyPartitioner.class);
-    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
-
-    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
-
-    boolean success = job.waitForCompletion(true);
-    Counters counters = job.getCounters();
-    LOG.info("Counters [" + counters + "]");
-    return success;
-  }
-
-  private PartitionedInputResult buildPartitionedInputData(String uuid, Path 
tmpPath, TableDescriptor descriptor,
-      List<Path> inprogressPathList, String snapshot, Path fileCachePath) 
throws IOException, ClassNotFoundException,
-      InterruptedException {
-    Job job = Job.getInstance(getConf(), "Partitioning data for table [" + 
descriptor.getName() + "]");
-    job.getConfiguration().set(BLUR_UPDATE_ID, uuid);
-
-    // Needed for the bloom filter path information.
-    BlurOutputFormat.setTableDescriptor(job, descriptor);
-    BlurInputFormat.setLocalCachePath(job, fileCachePath);
-    MapperForExistingDataWithIndexLookup.setSnapshot(job, snapshot);
-
-    for (Path p : inprogressPathList) {
-      FileInputFormat.addInputPath(job, p);
-    }
-    Path outputPath = new Path(tmpPath, UUID.randomUUID().toString());
-    job.setJarByClass(getClass());
-    job.setMapperClass(LookupBuilderMapper.class);
-    job.setReducerClass(LookupBuilderReducer.class);
-
-    int shardCount = descriptor.getShardCount();
-    job.setNumReduceTasks(shardCount);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(BooleanWritable.class);
-    FileOutputFormat.setOutputPath(job, outputPath);
-    if (job.waitForCompletion(true)) {
-      return new PartitionedInputResult(outputPath, job.getCounters(), 
shardCount, job.getTaskReports(TaskType.REDUCE));
-    } else {
-      throw new IOException("Partitioning failed!");
-    }
-  }
-
-  private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, 
String snapshot) throws BlurException,
-      TException, InterruptedException {
-    while (true) {
-      Map<String, List<String>> listSnapshots = client.listSnapshots(table);
-      boolean mrupdateSnapshots = false;
-      for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
-        List<String> value = e.getValue();
-        if (value.contains(snapshot)) {
-          mrupdateSnapshots = true;
-        }
-      }
-      if (!mrupdateSnapshots) {
-        return;
-      } else {
-        LOG.info(snapshot + " Snapshot for table [{0}] already exists", table);
-        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
-        LOG.info("Retrying");
-      }
-    }
-  }
-
-  private List<Path> movePathList(FileSystem fileSystem, Path dstDir, 
List<Path> lst) throws IOException {
-    List<Path> result = new ArrayList<Path>();
-    for (Path src : lst) {
-      Path dst = new Path(dstDir, src.getName());
-      if (fileSystem.rename(src, dst)) {
-        LOG.info("Moving [{0}] to [{1}]", src, dst);
-        result.add(dst);
-      } else {
-        LOG.error("Could not move [{0}] to [{1}]", src, dst);
-      }
-    }
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
deleted file mode 100644
index de96d24..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class HdfsConfigurationNamespaceMerge {
-
-  private static final String DFS_NAMESERVICES = "dfs.nameservices";
-  private static final Log LOG = 
LogFactory.getLog(HdfsConfigurationNamespaceMerge.class);
-
-  public static void main(String[] args) throws IOException {
-    Path p = new Path("./src/main/scripts/conf/hdfs");
-
-    Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new 
Configuration()), p);
-
-    // configuration.writeXml(System.out);
-
-    Collection<String> nameServices = 
configuration.getStringCollection(DFS_NAMESERVICES);
-    for (String name : nameServices) {
-      Path path = new Path("hdfs://" + name + "/");
-      FileSystem fileSystem = path.getFileSystem(configuration);
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus fileStatus : listStatus) {
-        System.out.println(fileStatus.getPath());
-      }
-    }
-  }
-
-  private static boolean checkHostName(String host) {
-    try {
-      InetAddress.getAllByName(host);
-      return true;
-    } catch (UnknownHostException e) {
-      LOG.warn("Host not found [" + host + "]");
-      return false;
-    }
-  }
-
-  public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws 
IOException {
-    List<Configuration> configList = new ArrayList<Configuration>();
-    gatherConfigs(fs, p, configList);
-    return merge(configList);
-  }
-
-  public static Configuration merge(List<Configuration> configList) throws 
IOException {
-    Configuration merge = new Configuration(false);
-    Set<String> nameServices = new HashSet<String>();
-    for (Configuration configuration : configList) {
-      String nameService = configuration.get(DFS_NAMESERVICES);
-      if (nameServices.contains(nameService)) {
-        throw new IOException("Multiple confs define namespace [" + 
nameService + "]");
-      }
-      nameServices.add(nameService);
-      if (shouldAdd(configuration, nameService)) {
-        for (Entry<String, String> e : configuration) {
-          String key = e.getKey();
-          if (key.contains(nameService)) {
-            String value = e.getValue();
-            merge.set(key, value);
-          }
-        }
-      }
-    }
-    merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ","));
-    return merge;
-  }
-
-  private static boolean shouldAdd(Configuration configuration, String 
nameService) {
-    for (Entry<String, String> e : configuration) {
-      String key = e.getKey();
-      if (key.contains(nameService) && 
key.startsWith("dfs.namenode.rpc-address.")) {
-        return checkHostName(getHost(e.getValue()));
-      }
-    }
-    return false;
-  }
-
-  private static String getHost(String host) {
-    return host.substring(0, host.indexOf(":"));
-  }
-
-  public static void gatherConfigs(FileSystem fs, Path p, List<Configuration> 
configList) throws IOException {
-    if (fs.isFile(p)) {
-      if (p.getName().endsWith(".xml")) {
-        LOG.info("Loading file [" + p + "]");
-        Configuration configuration = new Configuration(false);
-        configuration.addResource(p);
-        configList.add(configuration);
-      } else {
-        LOG.info("Skipping file [" + p + "]");
-      }
-    } else {
-      FileStatus[] listStatus = fs.listStatus(p);
-      for (FileStatus fileStatus : listStatus) {
-        gatherConfigs(fs, fileStatus.getPath(), configList);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
deleted file mode 100644
index 80d1410..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-public class InputSplitPruneUtil {
-
-  private static final String 
BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = 
"blur.lookup.rowid.update.from.new.data.count";
-  private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = 
"blur.lookup.rowid.from.new.data.count.";
-  private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = 
"blur.lookup.rowid.from.index.count.";
-
-  private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table";
-  private static final String BLUR_LOOKUP_RATIO_PER_SHARD = 
"blur.lookup.ratio.per.shard";
-  private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = 
"blur.lookup.max.total.per.shard";
-
-  private static final double DEFAULT_LOOKUP_RATIO = 0.5;
-  private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE;
-
-  public static boolean shouldLookupExecuteOnShard(Configuration 
configuration, String table, int shard) {
-    double lookupRatio = getLookupRatio(configuration);
-    long maxLookupCount = getMaxLookupCount(configuration);
-    long rowIdFromNewDataCount = 
getBlurLookupRowIdFromNewDataCount(configuration, table, shard);
-    long rowIdUpdateFromNewDataCount = 
getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard);
-    long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, 
table, shard);
-    return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, 
rowIdUpdateFromNewDataCount, lookupRatio,
-        maxLookupCount);
-  }
-
-  private static boolean shouldLookupRun(long rowIdFromIndexCount, long 
rowIdFromNewDataCount,
-      long rowIdUpdateFromNewDataCount, double lookupRatio, long 
maxLookupCount) {
-    if (rowIdUpdateFromNewDataCount > maxLookupCount) {
-      return false;
-    }
-    double d = (double) rowIdUpdateFromNewDataCount / (double) 
rowIdFromIndexCount;
-    if (d <= lookupRatio) {
-      return true;
-    }
-    return false;
-  }
-
-  public static double getLookupRatio(Configuration configuration) {
-    return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, 
DEFAULT_LOOKUP_RATIO);
-  }
-
-  private static long getMaxLookupCount(Configuration configuration) {
-    return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, 
DEFAULT_LOOKUP_MAX_TOTAL);
-  }
-
-  public static void setTable(Job job, String table) {
-    setTable(job.getConfiguration(), table);
-  }
-
-  public static void setTable(Configuration configuration, String table) {
-    configuration.set(BLUR_LOOKUP_TABLE, table);
-  }
-
-  public static String getTable(Configuration configuration) {
-    return configuration.get(BLUR_LOOKUP_TABLE);
-  }
-
-  public static String getBlurLookupRowIdFromIndexCountName(String table) {
-    return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table;
-  }
-
-  public static String getBlurLookupRowIdFromNewDataCountName(String table) {
-    return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table;
-  }
-
-  public static String getBlurLookupRowIdUpdateFromNewDataCountName(String 
table) {
-    return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table;
-  }
-
-  public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration 
configuration, String table, int shard) {
-    String[] strings = 
configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table));
-    return getCount(strings, shard);
-  }
-
-  public static long getBlurLookupRowIdFromNewDataCount(Configuration 
configuration, String table, int shard) {
-    String[] strings = 
configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table));
-    return getCount(strings, shard);
-  }
-
-  public static long getBlurLookupRowIdFromIndexCount(Configuration 
configuration, String table, int shard) {
-    String[] strings = 
configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table));
-    return getCount(strings, shard);
-  }
-
-  public static void setBlurLookupRowIdFromNewDataCounts(Job job, String 
table, long[] counts) {
-    setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts);
-  }
-
-  public static void setBlurLookupRowIdFromNewDataCounts(Configuration 
configuration, String table, long[] counts) {
-    configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), 
toStrings(counts));
-  }
-
-  public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String 
table, long[] counts) {
-    setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, 
counts);
-  }
-
-  public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration 
configuration, String table, long[] counts) {
-    
configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), 
toStrings(counts));
-  }
-
-  public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, 
long[] counts) {
-    setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts);
-  }
-
-  public static void setBlurLookupRowIdFromIndexCounts(Configuration 
configuration, String table, long[] counts) {
-    configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), 
toStrings(counts));
-  }
-
-  public static long getCount(String[] strings, int shard) {
-    return Long.parseLong(strings[shard]);
-  }
-
-  public static int getShardFromDirectoryPath(Path path) {
-    return ShardUtil.getShardIndex(path.getName());
-  }
-
-  public static String[] toStrings(long[] counts) {
-    if (counts == null) {
-      return null;
-    }
-    String[] strs = new String[counts.length];
-    for (int i = 0; i < counts.length; i++) {
-      strs[i] = Long.toString(counts[i]);
-    }
-    return strs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
deleted file mode 100644
index 87a3a32..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public class LookupBuilderMapper extends Mapper<Text, BlurRecord, Text, 
NullWritable> {
-
-  @Override
-  protected void map(Text key, BlurRecord value, Mapper<Text, BlurRecord, 
Text, NullWritable>.Context context)
-      throws IOException, InterruptedException {
-    context.write(new Text(value.getRowId()), NullWritable.get());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
deleted file mode 100644
index f3a2697..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.manager.BlurPartitioner;
-import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
-import org.apache.blur.mapreduce.lib.BlurInputFormat;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.mapreduce.lib.update.MergeSortRowIdMatcher.Action;
-import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.lucene.index.AtomicReader;
-import org.apache.lucene.index.AtomicReaderContext;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.Terms;
-import org.apache.lucene.store.Directory;
-
-import com.google.common.io.Closer;
-
-public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, 
BooleanWritable> {
-
-  public static final String BLUR_CACHE_DIR_TOTAL_BYTES = 
"blur.cache.dir.total.bytes";
-  private Counter _rowIds;
-  private Counter _rowIdsToUpdate;
-
-  private MergeSortRowIdMatcher _matcher;
-  private int _numberOfShardsInTable;
-  private Configuration _configuration;
-  private String _snapshot;
-  private Path _tablePath;
-  private Counter _rowIdsFromIndex;
-  private long _totalNumberOfBytes;
-  private Action _action;
-  private Closer _closer;
-  private Path _cachePath;
-  private String _table;
-  private Writer _writer;
-
-  @Override
-  protected void setup(Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context) throws IOException,
-      InterruptedException {
-    _configuration = context.getConfiguration();
-    _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
-    _rowIdsToUpdate = 
context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
-    _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
-    TableDescriptor tableDescriptor = 
BlurOutputFormat.getTableDescriptor(_configuration);
-    _numberOfShardsInTable = tableDescriptor.getShardCount();
-    _tablePath = new Path(tableDescriptor.getTableUri());
-    _snapshot = 
MapperForExistingDataWithIndexLookup.getSnapshot(_configuration);
-    _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 
128 * 1024 * 1024);
-    _cachePath = BlurInputFormat.getLocalCachePath(_configuration);
-    _table = tableDescriptor.getName();
-    _closer = Closer.create();
-  }
-
-  @Override
-  protected void reduce(Text rowId, Iterable<NullWritable> nothing,
-      Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) 
throws IOException, InterruptedException {
-    if (_matcher == null) {
-      _matcher = getMergeSortRowIdMatcher(rowId, context);
-    }
-    if (_writer == null) {
-      _writer = getRowIdWriter(rowId, context);
-    }
-    _writer.append(rowId, NullWritable.get());
-    _rowIds.increment(1);
-    if (_action == null) {
-      _action = new Action() {
-        @Override
-        public void found(Text rowId) throws IOException {
-          _rowIdsToUpdate.increment(1);
-          try {
-            context.write(rowId, new BooleanWritable(true));
-          } catch (InterruptedException e) {
-            throw new IOException(e);
-          }
-        }
-      };
-    }
-    _matcher.lookup(rowId, _action);
-  }
-
-  private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context)
-      throws IOException {
-    BlurPartitioner blurPartitioner = new BlurPartitioner();
-    int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
-    String shardName = ShardUtil.getShardName(shard);
-    Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, 
shardName);
-    Configuration configuration = context.getConfiguration();
-    String uuid = configuration.get(FasterDriver.BLUR_UPDATE_ID);
-    Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context));
-    return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, 
tmpPath));
-  }
-
-  private String getAttemptString(Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context) {
-    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
-    return taskAttemptID.toString();
-  }
-
-  @Override
-  protected void cleanup(Reducer<Text, NullWritable, Text, 
BooleanWritable>.Context context) throws IOException,
-      InterruptedException {
-    _closer.close();
-  }
-
-  private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId,
-      Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) 
throws IOException {
-    BlurPartitioner blurPartitioner = new BlurPartitioner();
-    int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
-    String shardName = ShardUtil.getShardName(shard);
-
-    Path shardPath = new Path(_tablePath, shardName);
-    HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
-    SnapshotIndexDeletionPolicy policy = new 
SnapshotIndexDeletionPolicy(_configuration,
-        SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
-    Long generation = policy.getGeneration(_snapshot);
-    if (generation == null) {
-      hdfsDirectory.close();
-      throw new IOException("Snapshot [" + _snapshot + "] not found in shard 
[" + shardPath + "]");
-    }
-
-    BlurConfiguration bc = new BlurConfiguration();
-    BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new 
BlockCacheDirectoryFactoryV2(bc,
-        _totalNumberOfBytes);
-    _closer.register(blockCacheDirectoryFactoryV2);
-    Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", 
"shard", hdfsDirectory, null);
-    List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
-    IndexCommit indexCommit = 
MapperForExistingDataWithIndexLookup.findIndexCommit(listCommits, generation, 
shardPath);
-    DirectoryReader reader = DirectoryReader.open(indexCommit);
-    _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader));
-
-    Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, 
shardName);
-    return new MergeSortRowIdMatcher(dir, generation, _configuration, 
cachePath, context);
-  }
-
-  private long getTotalNumberOfRowIds(DirectoryReader reader) throws 
IOException {
-    long total = 0;
-    List<AtomicReaderContext> leaves = reader.leaves();
-    for (AtomicReaderContext context : leaves) {
-      AtomicReader atomicReader = context.reader();
-      Terms terms = atomicReader.terms(BlurConstants.ROW_ID);
-      long expectedInsertions = terms.size();
-      if (expectedInsertions < 0) {
-        return -1;
-      }
-      total += expectedInsertions;
-    }
-    return total;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
----------------------------------------------------------------------
diff --git 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
 
b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
deleted file mode 100644
index bf86e19..0000000
--- 
a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.mapreduce.lib.TableBlurRecord;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public class MapperForExistingDataMod extends Mapper<Text, TableBlurRecord, 
IndexKey, IndexValue> {
-
-  private Counter _existingRecords;
-
-  @Override
-  protected void setup(Context context) throws IOException, 
InterruptedException {
-    Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER);
-    counter.increment(1);
-    _existingRecords = 
context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS);
-  }
-
-  @Override
-  protected void map(Text key, TableBlurRecord value, Context context) throws 
IOException, InterruptedException {
-    BlurRecord blurRecord = value.getBlurRecord();
-    IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), 
blurRecord.getRecordId());
-    context.write(oldDataKey, new IndexValue(blurRecord));
-    _existingRecords.increment(1L);
-  }
-
-}

Reply via email to