http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java new file mode 100644 index 0000000..43763cf --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderReducer.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.indexer.mapreduce; + +import java.io.IOException; +import java.util.List; + +import org.apache.blur.BlurConfiguration; +import org.apache.blur.indexer.BlurIndexCounter; +import org.apache.blur.indexer.IndexerJobDriver; +import org.apache.blur.indexer.MergeSortRowIdMatcher; +import org.apache.blur.indexer.MergeSortRowIdMatcher.Action; +import org.apache.blur.manager.BlurPartitioner; +import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy; +import org.apache.blur.mapreduce.lib.BlurInputFormat; +import org.apache.blur.mapreduce.lib.BlurOutputFormat; +import org.apache.blur.store.BlockCacheDirectoryFactoryV2; +import org.apache.blur.store.hdfs.HdfsDirectory; +import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.utils.BlurConstants; +import org.apache.blur.utils.ShardUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.Terms; +import org.apache.lucene.store.Directory; + +import com.google.common.io.Closer; + +public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, BooleanWritable> { + + public static final String BLUR_CACHE_DIR_TOTAL_BYTES = "blur.cache.dir.total.bytes"; + private Counter _rowIds; + private Counter _rowIdsToUpdate; + + private MergeSortRowIdMatcher _matcher; + private int _numberOfShardsInTable; + private Configuration _configuration; + private String _snapshot; + private Path _tablePath; + private Counter _rowIdsFromIndex; + private long _totalNumberOfBytes; + private Action _action; + private Closer _closer; + private Path _cachePath; + private String _table; + private Writer _writer; + + @Override + protected void setup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, + InterruptedException { + _configuration = context.getConfiguration(); + _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA); + _rowIdsToUpdate = context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA); + _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX); + TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration); + _numberOfShardsInTable = tableDescriptor.getShardCount(); + _tablePath = new Path(tableDescriptor.getTableUri()); + _snapshot = ExistingDataIndexLookupMapper.getSnapshot(_configuration); + _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024); + _cachePath = BlurInputFormat.getLocalCachePath(_configuration); + _table = tableDescriptor.getName(); + _closer = Closer.create(); + } + + @Override + protected void reduce(Text rowId, Iterable<NullWritable> nothing, + Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, InterruptedException { + if (_matcher == null) { + _matcher = getMergeSortRowIdMatcher(rowId, context); + } + if (_writer == null) { + _writer = getRowIdWriter(rowId, context); + } + _writer.append(rowId, NullWritable.get()); + _rowIds.increment(1); + if (_action == null) { + _action = new Action() { + @Override + public void found(Text rowId) throws IOException { + _rowIdsToUpdate.increment(1); + try { + context.write(rowId, new BooleanWritable(true)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + }; + } + _matcher.lookup(rowId, _action); + } + + private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) + throws IOException { + BlurPartitioner blurPartitioner = new BlurPartitioner(); + int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable); + String shardName = ShardUtil.getShardName(shard); + Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName); + Configuration configuration = context.getConfiguration(); + String uuid = configuration.get(IndexerJobDriver.BLUR_UPDATE_ID); + Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context)); + return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, tmpPath)); + } + + private String getAttemptString(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) { + TaskAttemptID taskAttemptID = context.getTaskAttemptID(); + return taskAttemptID.toString(); + } + + @Override + protected void cleanup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, + InterruptedException { + _closer.close(); + } + + private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId, + Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException { + BlurPartitioner blurPartitioner = new BlurPartitioner(); + int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable); + String shardName = ShardUtil.getShardName(shard); + + Path shardPath = new Path(_tablePath, shardName); + HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath); + SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration, + SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath)); + Long generation = policy.getGeneration(_snapshot); + if (generation == null) { + hdfsDirectory.close(); + throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]"); + } + + BlurConfiguration bc = new BlurConfiguration(); + BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc, + _totalNumberOfBytes); + _closer.register(blockCacheDirectoryFactoryV2); + Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null); + List<IndexCommit> listCommits = DirectoryReader.listCommits(dir); + IndexCommit indexCommit = ExistingDataIndexLookupMapper.findIndexCommit(listCommits, generation, shardPath); + DirectoryReader reader = DirectoryReader.open(indexCommit); + _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader)); + + Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName); + return new MergeSortRowIdMatcher(dir, generation, _configuration, cachePath, context); + } + + private long getTotalNumberOfRowIds(DirectoryReader reader) throws IOException { + long total = 0; + List<AtomicReaderContext> leaves = reader.leaves(); + for (AtomicReaderContext context : leaves) { + AtomicReader atomicReader = context.reader(); + Terms terms = atomicReader.terms(BlurConstants.ROW_ID); + long expectedInsertions = terms.size(); + if (expectedInsertions < 0) { + return -1; + } + total += expectedInsertions; + } + return total; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java new file mode 100644 index 0000000..c5ea87a --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/NewDataMapper.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.indexer.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Field; + +import org.apache.blur.indexer.BlurIndexCounter; +import org.apache.blur.mapreduce.lib.BlurRecord; +import org.apache.blur.mapreduce.lib.update.IndexKey; +import org.apache.blur.mapreduce.lib.update.IndexValue; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +public class NewDataMapper extends Mapper<Text, BlurRecord, IndexKey, IndexValue> { + + private static final IndexValue EMPTY_RECORD = new IndexValue(); + private long _timestamp; + private Counter _newRecords; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + InputSplit inputSplit = context.getInputSplit(); + FileSplit fileSplit = getFileSplit(inputSplit); + Path path = fileSplit.getPath(); + FileSystem fileSystem = path.getFileSystem(context.getConfiguration()); + FileStatus fileStatus = fileSystem.getFileStatus(path); + _timestamp = fileStatus.getModificationTime(); + _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS); + } + + private FileSplit getFileSplit(InputSplit inputSplit) throws IOException { + if (inputSplit instanceof FileSplit) { + return (FileSplit) inputSplit; + } + if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { + try { + Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit"); + declaredField.setAccessible(true); + return getFileSplit((InputSplit) declaredField.get(inputSplit)); + } catch (NoSuchFieldException e) { + throw new IOException(e); + } catch (SecurityException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } else { + throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]"); + } + } + + @Override + protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException { + IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp); + context.write(newDataKey, new IndexValue(blurRecord)); + _newRecords.increment(1L); + + IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId()); + context.write(newDataMarker, EMPTY_RECORD); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java new file mode 100644 index 0000000..bb957f5 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedBlurInputFormat.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.indexer.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.blur.indexer.InputSplitPruneUtil; +import org.apache.blur.mapreduce.lib.BlurInputFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; + +public class PrunedBlurInputFormat extends BlurInputFormat { + + private static final Log LOG = LogFactory.getLog(PrunedBlurInputFormat.class); + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException { + Path[] dirs = getInputPaths(context); + Configuration configuration = context.getConfiguration(); + List<BlurInputSplit> splits = getSplits(configuration, dirs); + Map<Path, List<BlurInputSplit>> splitMap = new TreeMap<Path, List<BlurInputSplit>>(); + for (BlurInputSplit split : splits) { + Path path = split.getDir(); + String table = split.getTable().toString(); + int shard = InputSplitPruneUtil.getShardFromDirectoryPath(path); + long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration, + table, shard); + long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard); + if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) { + LOG.debug("Pruning id lookup input path [" + path + "] no overlapping ids."); + } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) { + LOG.debug("Pruning blur input path [" + split.getDir() + "]"); + } else { + LOG.debug("Keeping blur input path [" + split.getDir() + "]"); + List<BlurInputSplit> list = splitMap.get(path); + if (list == null) { + splitMap.put(path, list = new ArrayList<BlurInputSplit>()); + } + list.add(split); + } + } + List<InputSplit> result = new ArrayList<InputSplit>(); + for (List<BlurInputSplit> lst : splitMap.values()) { + BlurInputSplitColletion blurInputSplitColletion = new BlurInputSplitColletion(); + for (BlurInputSplit blurInputSplit : lst) { + blurInputSplitColletion.add(blurInputSplit); + } + result.add(blurInputSplitColletion); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java new file mode 100644 index 0000000..49095d0 --- /dev/null +++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/PrunedSequenceFileInputFormat.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.indexer.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.blur.indexer.InputSplitPruneUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + +import com.google.common.base.Splitter; + +public class PrunedSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> { + + private static final Log LOG = LogFactory.getLog(PrunedSequenceFileInputFormat.class); + + @Override + public List<InputSplit> getSplits(JobContext job) throws IOException { + List<InputSplit> splits = super.getSplits(job); + List<InputSplit> results = new ArrayList<InputSplit>(); + Configuration configuration = job.getConfiguration(); + String table = InputSplitPruneUtil.getTable(configuration); + for (InputSplit inputSplit : splits) { + FileSplit fileSplit = (FileSplit) inputSplit; + Path path = fileSplit.getPath(); + LOG.debug("Getting shard index from path [" + path + "]"); + String name = path.getName(); + int shard = getShardIndex(name); + long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration, + table, shard); + long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard); + if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) { + LOG.debug("Pruning id lookup input path [" + path + "] no overlapping ids."); + } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) { + LOG.debug("Keeping id lookup input path [" + path + "]"); + results.add(inputSplit); + } else { + LOG.debug("Pruning id lookup input path [" + path + "]"); + } + } + return results; + } + + private int getShardIndex(String name) { + // based on file format of "part-r-00000", etc + Iterable<String> split = Splitter.on('-').split(name); + List<String> parts = new ArrayList<String>(); + for (String s : split) { + parts.add(s); + } + return Integer.parseInt(parts.get(2)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java deleted file mode 100644 index 590ba83..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -public enum BlurIndexCounter { - - NEW_RECORDS, ROW_IDS_FROM_INDEX, ROW_IDS_TO_UPDATE_FROM_NEW_DATA, ROW_IDS_FROM_NEW_DATA, - - INPUT_FORMAT_MAPPER, INPUT_FORMAT_EXISTING_RECORDS, - - LOOKUP_MAPPER, LOOKUP_MAPPER_EXISTING_RECORDS, LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT - -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java deleted file mode 100644 index f56b731..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.blur.log.Log; -import org.apache.blur.log.LogFactory; -import org.apache.blur.mapreduce.lib.BlurInputFormat; -import org.apache.blur.thirdparty.thrift_0_9_0.TException; -import org.apache.blur.thrift.BlurClient; -import org.apache.blur.thrift.generated.Blur.Iface; -import org.apache.blur.thrift.generated.BlurException; -import org.apache.blur.thrift.generated.TableDescriptor; -import org.apache.blur.thrift.generated.TableStats; -import org.apache.blur.utils.BlurConstants; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.log4j.LogManager; -import org.apache.log4j.xml.DOMConfigurator; - -public class ClusterDriver extends Configured implements Tool { - - private static final String BLUR_ENV = "blur.env"; - private static final Log LOG = LogFactory.getLog(ClusterDriver.class); - private static final String _SEP = "_"; - private static final String IMPORT = "import"; - - public static void main(String[] args) throws Exception { - String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE"); - System.out.println("Log file path [" + logFilePath + "]"); - System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath); - URL url = ClusterDriver.class.getResource("/program-log4j.xml"); - if (url != null) { - LOG.info("Reseting log4j config from classpath resource [{0}]", url); - LogManager.resetConfiguration(); - DOMConfigurator.configure(url); - } - int res = ToolRunner.run(new Configuration(), new ClusterDriver(), args); - System.exit(res); - } - - @Override - public int run(String[] args) throws Exception { - int c = 0; - final String blurEnv = args[c++]; - final String blurZkConnection = args[c++]; - final String extraConfig = args[c++]; - final int reducerMultiplier = Integer.parseInt(args[c++]); - final Configuration conf = getConf(); - - final ExecutorService service = Executors.newCachedThreadPool(); - final AtomicBoolean running = new AtomicBoolean(); - running.set(true); - - // Load configs for all filesystems. - Path path = new Path(extraConfig); - Configuration mergeHdfsConfigs = HdfsConfigurationNamespaceMerge.mergeHdfsConfigs(path.getFileSystem(conf), path); - conf.addResource(mergeHdfsConfigs); - conf.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, blurZkConnection); - conf.set(BLUR_ENV, blurEnv); - - final Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection); - - stopAllExistingMRJobs(blurEnv, conf); - cleanUpOldImportDirs(client, conf); - moveInprogressDirsBackToNew(client, conf); - unlockLockedTables(client); - - Map<String, Future<Void>> futures = new HashMap<String, Future<Void>>(); - while (running.get()) { - LOG.debug("Starting index update check for blur cluster [" + blurZkConnection + "]."); - try { - List<String> tableList = client.tableList(); - startMissingIndexerThreads(tableList, service, futures, blurZkConnection, conf, client, reducerMultiplier); - } catch (TException t) { - LOG.error("Unknown Blur Thrift Error, Retrying...", t); - } - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - } - return 0; - } - - private void unlockLockedTables(Iface client) throws BlurException, TException { - List<String> tableList = client.tableList(); - for (String table : tableList) { - TableDescriptor tableDescriptor = client.describe(table); - if (tableDescriptor.isEnabled()) { - unlockLockedTables(client, table); - } - } - } - - private void unlockLockedTables(Iface client, String table) throws BlurException, TException { - Map<String, List<String>> listSnapshots = client.listSnapshots(table); - for (Entry<String, List<String>> e : listSnapshots.entrySet()) { - List<String> value = e.getValue(); - if (value.contains(FasterDriver.MRUPDATE_SNAPSHOT)) { - LOG.info("Unlocking table [{0}]", table); - client.removeSnapshot(table, FasterDriver.MRUPDATE_SNAPSHOT); - return; - } - } - } - - private void moveInprogressDirsBackToNew(Iface client, Configuration conf) throws BlurException, TException, - IOException { - List<String> tableList = client.tableList(); - for (String table : tableList) { - String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table); - Path mrIncWorkingPath = new Path(mrIncWorkingPathStr); - Path newData = new Path(mrIncWorkingPath, FasterDriver.NEW); - Path inprogressData = new Path(mrIncWorkingPath, FasterDriver.INPROGRESS); - FileSystem fileSystem = inprogressData.getFileSystem(conf); - FileStatus[] listStatus = fileSystem.listStatus(inprogressData); - for (FileStatus fileStatus : listStatus) { - Path src = fileStatus.getPath(); - Path dst = new Path(newData, src.getName()); - if (fileSystem.rename(src, dst)) { - LOG.info("Moved [{0}] to [{1}] to be reprocessed.", src, dst); - } else { - LOG.error("Could not move [{0}] to [{1}] to be reprocessed.", src, dst); - } - } - } - } - - private void cleanUpOldImportDirs(Iface client, Configuration conf) throws BlurException, TException, IOException { - List<String> tableList = client.tableList(); - for (String table : tableList) { - cleanUpOldImportDirs(client, conf, table); - } - } - - private void cleanUpOldImportDirs(Iface client, Configuration conf, String table) throws BlurException, TException, - IOException { - TableDescriptor descriptor = client.describe(table); - String tableUri = descriptor.getTableUri(); - Path tablePath = new Path(tableUri); - FileSystem fileSystem = tablePath.getFileSystem(getConf()); - Path importPath = new Path(tablePath, IMPORT); - if (fileSystem.exists(importPath)) { - for (FileStatus fileStatus : fileSystem.listStatus(importPath)) { - Path path = fileStatus.getPath(); - LOG.info("Removing failed import [{0}]", path); - fileSystem.delete(path, true); - } - } - } - - private void stopAllExistingMRJobs(String blurEnv, Configuration conf) throws YarnException, IOException, - InterruptedException { - Cluster cluster = new Cluster(conf); - JobStatus[] allJobStatuses = cluster.getAllJobStatuses(); - for (JobStatus jobStatus : allJobStatuses) { - if (jobStatus.isJobComplete()) { - continue; - } - String jobFile = jobStatus.getJobFile(); - JobID jobID = jobStatus.getJobID(); - Job job = cluster.getJob(jobID); - FileSystem fileSystem = FileSystem.get(job.getConfiguration()); - Configuration configuration = new Configuration(false); - Path path = new Path(jobFile); - Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); - if (hasReadAccess(fileSystem, makeQualified)) { - try (FSDataInputStream in = fileSystem.open(makeQualified)) { - configuration.addResource(copy(in)); - } - String jobBlurEnv = configuration.get(BLUR_ENV); - LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", jobID, jobBlurEnv, blurEnv); - if (blurEnv.equals(jobBlurEnv)) { - LOG.info("Killing running job [{0}]", jobID); - job.killJob(); - } - } - } - } - - private static InputStream copy(FSDataInputStream input) throws IOException { - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - IOUtils.copy(input, outputStream); - return new ByteArrayInputStream(outputStream.toByteArray()); - } - } - - private static boolean hasReadAccess(FileSystem fileSystem, Path p) { - try { - fileSystem.access(p, FsAction.READ); - return true; - } catch (IOException e) { - return false; - } - } - - private Callable<Void> getCallable(final String blurZkConnection, final Configuration conf, final Iface client, - final String table, final int reducerMultiplier) { - return new Callable<Void>() { - @Override - public Void call() throws Exception { - String originalThreadName = Thread.currentThread().getName(); - try { - Thread.currentThread().setName(table); - if (!isEnabled(client, table)) { - LOG.info("Table [" + table + "] is not enabled."); - return null; - } - waitForDataToLoad(client, table); - LOG.debug("Starting index update for table [" + table + "]."); - final String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table); - final String outputPathStr = getOutputPathStr(client, table); - Path path = new Path(outputPathStr); - FileSystem fileSystem = path.getFileSystem(getConf()); - - Configuration configuration = new Configuration(conf); - BlurInputFormat.setMaxNumberOfMaps(configuration, 10000); - - FasterDriver driver = new FasterDriver(); - driver.setConf(configuration); - try { - driver.run(new String[] { table, mrIncWorkingPathStr, outputPathStr, blurZkConnection, - Integer.toString(reducerMultiplier) }); - } finally { - if (fileSystem.exists(path)) { - fileSystem.delete(path, true); - } - } - return null; - } finally { - Thread.currentThread().setName(originalThreadName); - } - } - }; - } - - private void startMissingIndexerThreads(List<String> tableList, ExecutorService service, - Map<String, Future<Void>> futures, final String blurZkConnection, final Configuration conf, final Iface client, - int reducerMultiplier) throws BlurException, TException { - Set<String> tables = new HashSet<String>(tableList); - - // remove futures that are complete - for (String table : tables) { - Future<Void> future = futures.get(table); - if (future != null) { - if (future.isDone()) { - try { - future.get(); - } catch (InterruptedException e) { - LOG.error("Unknown error while processing table [" + table + "].", e); - } catch (ExecutionException e) { - LOG.error("Unknown error while processing table [" + table + "].", e.getCause()); - } - futures.remove(table); - } else { - LOG.info("Update for table [" + table + "] still running."); - } - } - } - - // start missing tables - for (String table : tables) { - if (!futures.containsKey(table)) { - if (isEnabled(client, table)) { - Future<Void> future = service.submit(getCallable(blurZkConnection, conf, client, table, reducerMultiplier)); - futures.put(table, future); - } - } - } - } - - public static void waitForDataToLoad(Iface client, String table) throws BlurException, TException, - InterruptedException { - if (isFullyLoaded(client.tableStats(table))) { - return; - } - while (true) { - TableStats tableStats = client.tableStats(table); - if (isFullyLoaded(tableStats)) { - LOG.info("Data load complete in table [" + table + "] [" + tableStats + "]"); - return; - } - LOG.info("Waiting for data to load in table [" + table + "] [" + tableStats + "]"); - Thread.sleep(5000); - } - } - - private static boolean isFullyLoaded(TableStats tableStats) { - if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) { - return true; - } - return false; - } - - private boolean isEnabled(Iface client, String table) throws BlurException, TException { - TableDescriptor tableDescriptor = client.describe(table); - return tableDescriptor.isEnabled(); - } - - private void mkdirs(FileSystem fileSystem, Path path) throws IOException { - if (fileSystem.exists(path)) { - return; - } - LOG.info("Creating path [" + path + "]."); - if (!fileSystem.mkdirs(path)) { - LOG.error("Path [" + path + "] could not be created."); - } - } - - public static String getMRIncWorkingPathStr(Iface client, String table) throws BlurException, TException, IOException { - TableDescriptor descriptor = client.describe(table); - Map<String, String> tableProperties = descriptor.getTableProperties(); - if (tableProperties != null) { - String workingPath = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH); - if (workingPath != null) { - return workingPath; - } - } - throw new IOException("Table [" + table + "] does not have the property [" - + BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH + "] setup correctly."); - } - - private String getOutputPathStr(Iface client, String table) throws BlurException, TException, IOException { - TableDescriptor descriptor = client.describe(table); - String tableUri = descriptor.getTableUri(); - Path tablePath = new Path(tableUri); - FileSystem fileSystem = tablePath.getFileSystem(getConf()); - Path importPath = new Path(tablePath, IMPORT); - mkdirs(fileSystem, importPath); - return new Path(importPath, IMPORT + _SEP + System.currentTimeMillis() + _SEP + UUID.randomUUID().toString()) - .toString(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java deleted file mode 100644 index f43cba5..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java +++ /dev/null @@ -1,486 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.blur.log.Log; -import org.apache.blur.log.LogFactory; -import org.apache.blur.mapreduce.lib.BlurInputFormat; -import org.apache.blur.mapreduce.lib.BlurOutputFormat; -import org.apache.blur.thirdparty.thrift_0_9_0.TException; -import org.apache.blur.thrift.BlurClient; -import org.apache.blur.thrift.generated.Blur.Iface; -import org.apache.blur.thrift.generated.BlurException; -import org.apache.blur.thrift.generated.TableDescriptor; -import org.apache.blur.thrift.generated.TableStats; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskReport; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class FasterDriver extends Configured implements Tool { - - public static final String BLUR_UPDATE_ID = "blur.update.id"; - private static final String BLUR_EXEC_TYPE = "blur.exec.type"; - public static final String TMP = "tmp"; - - public enum EXEC { - MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC - } - - public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot"; - public static final String CACHE = "cache"; - public static final String COMPLETE = "complete"; - public static final String INPROGRESS = "inprogress"; - public static final String NEW = "new"; - private static final Log LOG = LogFactory.getLog(FasterDriver.class); - - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new FasterDriver(), args); - System.exit(res); - } - - static class PartitionedInputResult { - final Path _partitionedInputData; - final Counters _counters; - final long[] _rowIdsFromNewData; - final long[] _rowIdsToUpdateFromNewData; - final long[] _rowIdsFromIndex; - - PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) { - _partitionedInputData = partitionedInputData; - _counters = counters; - _rowIdsFromNewData = new long[shards]; - _rowIdsToUpdateFromNewData = new long[shards]; - _rowIdsFromIndex = new long[shards]; - for (TaskReport tr : taskReports) { - int id = tr.getTaskID().getId(); - Counters taskCounters = tr.getTaskCounters(); - Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA); - _rowIdsFromNewData[id] = total.getValue(); - Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA); - _rowIdsToUpdateFromNewData[id] = update.getValue(); - Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX); - _rowIdsFromIndex[id] = index.getValue(); - } - } - - } - - @Override - public int run(String[] args) throws Exception { - int c = 0; - if (args.length < 5) { - System.err - .println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>"); - return 1; - } - String table = args[c++]; - String mrIncWorkingPathStr = args[c++]; - String outputPathStr = args[c++]; - String blurZkConnection = args[c++]; - int reducerMultipler = Integer.parseInt(args[c++]); - for (; c < args.length; c++) { - String externalConfigFileToAdd = args[c]; - getConf().addResource(new Path(externalConfigFileToAdd)); - } - - Path outputPath = new Path(outputPathStr); - Path mrIncWorkingPath = new Path(mrIncWorkingPathStr); - FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf()); - - Path newData = new Path(mrIncWorkingPath, NEW); - Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS); - Path completeData = new Path(mrIncWorkingPath, COMPLETE); - Path fileCache = new Path(mrIncWorkingPath, CACHE); - Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP); - - Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString()); - - fileSystem.mkdirs(newData); - fileSystem.mkdirs(inprogressData); - fileSystem.mkdirs(completeData); - fileSystem.mkdirs(fileCache); - - List<Path> srcPathList = new ArrayList<Path>(); - for (FileStatus fileStatus : fileSystem.listStatus(newData)) { - srcPathList.add(fileStatus.getPath()); - } - if (srcPathList.isEmpty()) { - return 0; - } - - List<Path> inprogressPathList = new ArrayList<Path>(); - boolean success = false; - Iface client = null; - - EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, EXEC.AUTOMATIC.name()).toUpperCase()); - - String uuid = UUID.randomUUID().toString(); - - try { - client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection); - TableDescriptor descriptor = client.describe(table); - Map<String, String> tableProperties = descriptor.getTableProperties(); - String fastDir = tableProperties.get("blur.table.disable.fast.dir"); - if (fastDir == null || !fastDir.equals("true")) { - LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not supported in fast MR update.", table); - return 1; - } - - waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT); - client.createSnapshot(table, MRUPDATE_SNAPSHOT); - TableStats tableStats = client.tableStats(table); - - inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList); - - switch (exec) { - case MR_ONLY: - success = runMrOnly(descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler); - break; - case MR_WITH_LOOKUP: - success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler, - tmpPath, tableStats, MRUPDATE_SNAPSHOT); - break; - case AUTOMATIC: - success = runAutomatic(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler, - tmpPath, tableStats, MRUPDATE_SNAPSHOT); - break; - default: - throw new RuntimeException("Exec type [" + exec + "] not supported."); - } - } finally { - if (success) { - LOG.info("Associate lookup cache with new data!"); - associateLookupCache(uuid, fileCache, outputPath); - LOG.info("Indexing job succeeded!"); - client.loadData(table, outputPathStr); - LOG.info("Load data called"); - movePathList(fileSystem, completeData, inprogressPathList); - LOG.info("Input data moved to complete"); - ClusterDriver.waitForDataToLoad(client, table); - LOG.info("Data loaded"); - } else { - LOG.error("Indexing job failed!"); - movePathList(fileSystem, newData, inprogressPathList); - } - fileSystem.delete(tmpPath, true); - if (client != null) { - client.removeSnapshot(table, MRUPDATE_SNAPSHOT); - } - } - - if (success) { - return 0; - } else { - return 1; - } - } - - private void associateLookupCache(String uuid, Path fileCache, Path outputPath) throws IOException { - FileSystem fileSystem = fileCache.getFileSystem(getConf()); - cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache); - associateLookupCache(fileSystem, uuid, fileSystem.getFileStatus(fileCache), outputPath); - } - - private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, Path fileCache) throws IOException { - FileStatus[] listStatus = fileSystem.listStatus(fileCache); - List<FileStatus> uuidPaths = new ArrayList<FileStatus>(); - for (FileStatus fs : listStatus) { - Path path = fs.getPath(); - if (fs.isDirectory()) { - cleanupExtraFileFromSpecX(fileSystem, uuid, path); - } else if (path.getName().startsWith(uuid)) { - uuidPaths.add(fs); - } - } - if (uuidPaths.size() > 1) { - deleteIncomplete(fileSystem, uuidPaths); - } - } - - private void deleteIncomplete(FileSystem fileSystem, List<FileStatus> uuidPaths) throws IOException { - long max = 0; - FileStatus keeper = null; - for (FileStatus fs : uuidPaths) { - long len = fs.getLen(); - if (len > max) { - keeper = fs; - max = len; - } - } - for (FileStatus fs : uuidPaths) { - if (fs != keeper) { - LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath()); - fileSystem.delete(fs.getPath(), false); - } - } - } - - private void associateLookupCache(FileSystem fileSystem, String uuid, FileStatus fileCache, Path outputPath) - throws IOException { - Path path = fileCache.getPath(); - if (fileCache.isDirectory()) { - FileStatus[] listStatus = fileSystem.listStatus(path); - for (FileStatus fs : listStatus) { - associateLookupCache(fileSystem, uuid, fs, outputPath); - } - } else if (path.getName().startsWith(uuid)) { - Path parent = path.getParent(); - String shardName = parent.getName(); - Path indexPath = findOutputDirPath(outputPath, shardName); - LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, outputPath); - String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), indexPath); - Path file = new Path(path.getParent(), id + ".seq"); - MergeSortRowIdMatcher.commitWriter(getConf(), file, path); - } - } - - private Path findOutputDirPath(Path outputPath, String shardName) throws IOException { - FileSystem fileSystem = outputPath.getFileSystem(getConf()); - Path shardPath = new Path(outputPath, shardName); - if (!fileSystem.exists(shardPath)) { - throw new IOException("Shard path [" + shardPath + "]"); - } - FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().endsWith(".commit"); - } - }); - if (listStatus.length == 1) { - FileStatus fs = listStatus[0]; - return fs.getPath(); - } else { - throw new IOException("More than one sub dir [" + shardPath + "]"); - } - } - - private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table, - Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot) - throws ClassNotFoundException, IOException, InterruptedException { - PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot, - fileCache); - - Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]"); - - InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData); - InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData); - InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex); - InputSplitPruneUtil.setTable(job, table); - - BlurInputFormat.setLocalCachePath(job, fileCache); - - // Existing data - This adds the copy data files first open and stream - // through all documents. - { - Path tablePath = new Path(descriptor.getTableUri()); - BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT); - MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, MapperForExistingDataMod.class); - } - - // Existing data - This adds the row id lookup - { - MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT); - FileInputFormat.addInputPath(job, result._partitionedInputData); - MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class, - MapperForExistingDataWithIndexLookup.class); - } - - // New Data - for (Path p : inprogressPathList) { - FileInputFormat.addInputPath(job, p); - MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class); - } - - BlurOutputFormat.setOutputPath(job, outputPath); - BlurOutputFormat.setupJob(job, descriptor); - - job.setReducerClass(UpdateReducer.class); - job.setMapOutputKeyClass(IndexKey.class); - job.setMapOutputValueClass(IndexValue.class); - job.setPartitionerClass(IndexKeyPartitioner.class); - job.setGroupingComparatorClass(IndexKeyWritableComparator.class); - - BlurOutputFormat.setReducerMultiplier(job, reducerMultipler); - - boolean success = job.waitForCompletion(true); - Counters counters = job.getCounters(); - LOG.info("Counters [" + counters + "]"); - return success; - } - - private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table, - Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot) - throws ClassNotFoundException, IOException, InterruptedException { - PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot, - fileCache); - - Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]"); - - MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT); - FileInputFormat.addInputPath(job, result._partitionedInputData); - MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class, - MapperForExistingDataWithIndexLookup.class); - - for (Path p : inprogressPathList) { - FileInputFormat.addInputPath(job, p); - MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class); - } - - BlurOutputFormat.setOutputPath(job, outputPath); - BlurOutputFormat.setupJob(job, descriptor); - - job.setReducerClass(UpdateReducer.class); - job.setMapOutputKeyClass(IndexKey.class); - job.setMapOutputValueClass(IndexValue.class); - job.setPartitionerClass(IndexKeyPartitioner.class); - job.setGroupingComparatorClass(IndexKeyWritableComparator.class); - - BlurOutputFormat.setReducerMultiplier(job, reducerMultipler); - - boolean success = job.waitForCompletion(true); - Counters counters = job.getCounters(); - LOG.info("Counters [" + counters + "]"); - return success; - } - - private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache, - Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException { - Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]"); - Path tablePath = new Path(descriptor.getTableUri()); - BlurInputFormat.setLocalCachePath(job, fileCache); - BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT); - MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingDataMod.class); - - for (Path p : inprogressPathList) { - FileInputFormat.addInputPath(job, p); - MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class); - } - - BlurOutputFormat.setOutputPath(job, outputPath); - BlurOutputFormat.setupJob(job, descriptor); - - job.setReducerClass(UpdateReducer.class); - job.setMapOutputKeyClass(IndexKey.class); - job.setMapOutputValueClass(IndexValue.class); - job.setPartitionerClass(IndexKeyPartitioner.class); - job.setGroupingComparatorClass(IndexKeyWritableComparator.class); - - BlurOutputFormat.setReducerMultiplier(job, reducerMultipler); - - boolean success = job.waitForCompletion(true); - Counters counters = job.getCounters(); - LOG.info("Counters [" + counters + "]"); - return success; - } - - private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor, - List<Path> inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException, - InterruptedException { - Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]"); - job.getConfiguration().set(BLUR_UPDATE_ID, uuid); - - // Needed for the bloom filter path information. - BlurOutputFormat.setTableDescriptor(job, descriptor); - BlurInputFormat.setLocalCachePath(job, fileCachePath); - MapperForExistingDataWithIndexLookup.setSnapshot(job, snapshot); - - for (Path p : inprogressPathList) { - FileInputFormat.addInputPath(job, p); - } - Path outputPath = new Path(tmpPath, UUID.randomUUID().toString()); - job.setJarByClass(getClass()); - job.setMapperClass(LookupBuilderMapper.class); - job.setReducerClass(LookupBuilderReducer.class); - - int shardCount = descriptor.getShardCount(); - job.setNumReduceTasks(shardCount); - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(BooleanWritable.class); - FileOutputFormat.setOutputPath(job, outputPath); - if (job.waitForCompletion(true)) { - return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE)); - } else { - throw new IOException("Partitioning failed!"); - } - } - - private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot) throws BlurException, - TException, InterruptedException { - while (true) { - Map<String, List<String>> listSnapshots = client.listSnapshots(table); - boolean mrupdateSnapshots = false; - for (Entry<String, List<String>> e : listSnapshots.entrySet()) { - List<String> value = e.getValue(); - if (value.contains(snapshot)) { - mrupdateSnapshots = true; - } - } - if (!mrupdateSnapshots) { - return; - } else { - LOG.info(snapshot + " Snapshot for table [{0}] already exists", table); - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - LOG.info("Retrying"); - } - } - } - - private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path> lst) throws IOException { - List<Path> result = new ArrayList<Path>(); - for (Path src : lst) { - Path dst = new Path(dstDir, src.getName()); - if (fileSystem.rename(src, dst)) { - LOG.info("Moving [{0}] to [{1}]", src, dst); - result.add(dst); - } else { - LOG.error("Could not move [{0}] to [{1}]", src, dst); - } - } - return result; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java deleted file mode 100644 index de96d24..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public class HdfsConfigurationNamespaceMerge { - - private static final String DFS_NAMESERVICES = "dfs.nameservices"; - private static final Log LOG = LogFactory.getLog(HdfsConfigurationNamespaceMerge.class); - - public static void main(String[] args) throws IOException { - Path p = new Path("./src/main/scripts/conf/hdfs"); - - Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new Configuration()), p); - - // configuration.writeXml(System.out); - - Collection<String> nameServices = configuration.getStringCollection(DFS_NAMESERVICES); - for (String name : nameServices) { - Path path = new Path("hdfs://" + name + "/"); - FileSystem fileSystem = path.getFileSystem(configuration); - FileStatus[] listStatus = fileSystem.listStatus(path); - for (FileStatus fileStatus : listStatus) { - System.out.println(fileStatus.getPath()); - } - } - } - - private static boolean checkHostName(String host) { - try { - InetAddress.getAllByName(host); - return true; - } catch (UnknownHostException e) { - LOG.warn("Host not found [" + host + "]"); - return false; - } - } - - public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws IOException { - List<Configuration> configList = new ArrayList<Configuration>(); - gatherConfigs(fs, p, configList); - return merge(configList); - } - - public static Configuration merge(List<Configuration> configList) throws IOException { - Configuration merge = new Configuration(false); - Set<String> nameServices = new HashSet<String>(); - for (Configuration configuration : configList) { - String nameService = configuration.get(DFS_NAMESERVICES); - if (nameServices.contains(nameService)) { - throw new IOException("Multiple confs define namespace [" + nameService + "]"); - } - nameServices.add(nameService); - if (shouldAdd(configuration, nameService)) { - for (Entry<String, String> e : configuration) { - String key = e.getKey(); - if (key.contains(nameService)) { - String value = e.getValue(); - merge.set(key, value); - } - } - } - } - merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ",")); - return merge; - } - - private static boolean shouldAdd(Configuration configuration, String nameService) { - for (Entry<String, String> e : configuration) { - String key = e.getKey(); - if (key.contains(nameService) && key.startsWith("dfs.namenode.rpc-address.")) { - return checkHostName(getHost(e.getValue())); - } - } - return false; - } - - private static String getHost(String host) { - return host.substring(0, host.indexOf(":")); - } - - public static void gatherConfigs(FileSystem fs, Path p, List<Configuration> configList) throws IOException { - if (fs.isFile(p)) { - if (p.getName().endsWith(".xml")) { - LOG.info("Loading file [" + p + "]"); - Configuration configuration = new Configuration(false); - configuration.addResource(p); - configList.add(configuration); - } else { - LOG.info("Skipping file [" + p + "]"); - } - } else { - FileStatus[] listStatus = fs.listStatus(p); - for (FileStatus fileStatus : listStatus) { - gatherConfigs(fs, fileStatus.getPath(), configList); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java deleted file mode 100644 index 80d1410..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import org.apache.blur.utils.ShardUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; - -public class InputSplitPruneUtil { - - private static final String BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.update.from.new.data.count"; - private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.from.new.data.count."; - private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = "blur.lookup.rowid.from.index.count."; - - private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table"; - private static final String BLUR_LOOKUP_RATIO_PER_SHARD = "blur.lookup.ratio.per.shard"; - private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = "blur.lookup.max.total.per.shard"; - - private static final double DEFAULT_LOOKUP_RATIO = 0.5; - private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE; - - public static boolean shouldLookupExecuteOnShard(Configuration configuration, String table, int shard) { - double lookupRatio = getLookupRatio(configuration); - long maxLookupCount = getMaxLookupCount(configuration); - long rowIdFromNewDataCount = getBlurLookupRowIdFromNewDataCount(configuration, table, shard); - long rowIdUpdateFromNewDataCount = getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard); - long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, table, shard); - return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, rowIdUpdateFromNewDataCount, lookupRatio, - maxLookupCount); - } - - private static boolean shouldLookupRun(long rowIdFromIndexCount, long rowIdFromNewDataCount, - long rowIdUpdateFromNewDataCount, double lookupRatio, long maxLookupCount) { - if (rowIdUpdateFromNewDataCount > maxLookupCount) { - return false; - } - double d = (double) rowIdUpdateFromNewDataCount / (double) rowIdFromIndexCount; - if (d <= lookupRatio) { - return true; - } - return false; - } - - public static double getLookupRatio(Configuration configuration) { - return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, DEFAULT_LOOKUP_RATIO); - } - - private static long getMaxLookupCount(Configuration configuration) { - return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, DEFAULT_LOOKUP_MAX_TOTAL); - } - - public static void setTable(Job job, String table) { - setTable(job.getConfiguration(), table); - } - - public static void setTable(Configuration configuration, String table) { - configuration.set(BLUR_LOOKUP_TABLE, table); - } - - public static String getTable(Configuration configuration) { - return configuration.get(BLUR_LOOKUP_TABLE); - } - - public static String getBlurLookupRowIdFromIndexCountName(String table) { - return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table; - } - - public static String getBlurLookupRowIdFromNewDataCountName(String table) { - return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table; - } - - public static String getBlurLookupRowIdUpdateFromNewDataCountName(String table) { - return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table; - } - - public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration configuration, String table, int shard) { - String[] strings = configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table)); - return getCount(strings, shard); - } - - public static long getBlurLookupRowIdFromNewDataCount(Configuration configuration, String table, int shard) { - String[] strings = configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table)); - return getCount(strings, shard); - } - - public static long getBlurLookupRowIdFromIndexCount(Configuration configuration, String table, int shard) { - String[] strings = configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table)); - return getCount(strings, shard); - } - - public static void setBlurLookupRowIdFromNewDataCounts(Job job, String table, long[] counts) { - setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts); - } - - public static void setBlurLookupRowIdFromNewDataCounts(Configuration configuration, String table, long[] counts) { - configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), toStrings(counts)); - } - - public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String table, long[] counts) { - setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, counts); - } - - public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration configuration, String table, long[] counts) { - configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), toStrings(counts)); - } - - public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, long[] counts) { - setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts); - } - - public static void setBlurLookupRowIdFromIndexCounts(Configuration configuration, String table, long[] counts) { - configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), toStrings(counts)); - } - - public static long getCount(String[] strings, int shard) { - return Long.parseLong(strings[shard]); - } - - public static int getShardFromDirectoryPath(Path path) { - return ShardUtil.getShardIndex(path.getName()); - } - - public static String[] toStrings(long[] counts) { - if (counts == null) { - return null; - } - String[] strs = new String[counts.length]; - for (int i = 0; i < counts.length; i++) { - strs[i] = Long.toString(counts[i]); - } - return strs; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java deleted file mode 100644 index 87a3a32..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import java.io.IOException; - -import org.apache.blur.mapreduce.lib.BlurRecord; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; - -public class LookupBuilderMapper extends Mapper<Text, BlurRecord, Text, NullWritable> { - - @Override - protected void map(Text key, BlurRecord value, Mapper<Text, BlurRecord, Text, NullWritable>.Context context) - throws IOException, InterruptedException { - context.write(new Text(value.getRowId()), NullWritable.get()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java deleted file mode 100644 index f3a2697..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import java.io.IOException; -import java.util.List; - -import org.apache.blur.BlurConfiguration; -import org.apache.blur.manager.BlurPartitioner; -import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy; -import org.apache.blur.mapreduce.lib.BlurInputFormat; -import org.apache.blur.mapreduce.lib.BlurOutputFormat; -import org.apache.blur.mapreduce.lib.update.MergeSortRowIdMatcher.Action; -import org.apache.blur.store.BlockCacheDirectoryFactoryV2; -import org.apache.blur.store.hdfs.HdfsDirectory; -import org.apache.blur.thrift.generated.TableDescriptor; -import org.apache.blur.utils.BlurConstants; -import org.apache.blur.utils.ShardUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.lucene.index.AtomicReader; -import org.apache.lucene.index.AtomicReaderContext; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.Terms; -import org.apache.lucene.store.Directory; - -import com.google.common.io.Closer; - -public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, BooleanWritable> { - - public static final String BLUR_CACHE_DIR_TOTAL_BYTES = "blur.cache.dir.total.bytes"; - private Counter _rowIds; - private Counter _rowIdsToUpdate; - - private MergeSortRowIdMatcher _matcher; - private int _numberOfShardsInTable; - private Configuration _configuration; - private String _snapshot; - private Path _tablePath; - private Counter _rowIdsFromIndex; - private long _totalNumberOfBytes; - private Action _action; - private Closer _closer; - private Path _cachePath; - private String _table; - private Writer _writer; - - @Override - protected void setup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, - InterruptedException { - _configuration = context.getConfiguration(); - _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA); - _rowIdsToUpdate = context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA); - _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX); - TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration); - _numberOfShardsInTable = tableDescriptor.getShardCount(); - _tablePath = new Path(tableDescriptor.getTableUri()); - _snapshot = MapperForExistingDataWithIndexLookup.getSnapshot(_configuration); - _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024); - _cachePath = BlurInputFormat.getLocalCachePath(_configuration); - _table = tableDescriptor.getName(); - _closer = Closer.create(); - } - - @Override - protected void reduce(Text rowId, Iterable<NullWritable> nothing, - Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, InterruptedException { - if (_matcher == null) { - _matcher = getMergeSortRowIdMatcher(rowId, context); - } - if (_writer == null) { - _writer = getRowIdWriter(rowId, context); - } - _writer.append(rowId, NullWritable.get()); - _rowIds.increment(1); - if (_action == null) { - _action = new Action() { - @Override - public void found(Text rowId) throws IOException { - _rowIdsToUpdate.increment(1); - try { - context.write(rowId, new BooleanWritable(true)); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - }; - } - _matcher.lookup(rowId, _action); - } - - private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) - throws IOException { - BlurPartitioner blurPartitioner = new BlurPartitioner(); - int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable); - String shardName = ShardUtil.getShardName(shard); - Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName); - Configuration configuration = context.getConfiguration(); - String uuid = configuration.get(FasterDriver.BLUR_UPDATE_ID); - Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context)); - return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, tmpPath)); - } - - private String getAttemptString(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) { - TaskAttemptID taskAttemptID = context.getTaskAttemptID(); - return taskAttemptID.toString(); - } - - @Override - protected void cleanup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, - InterruptedException { - _closer.close(); - } - - private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId, - Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException { - BlurPartitioner blurPartitioner = new BlurPartitioner(); - int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable); - String shardName = ShardUtil.getShardName(shard); - - Path shardPath = new Path(_tablePath, shardName); - HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath); - SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration, - SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath)); - Long generation = policy.getGeneration(_snapshot); - if (generation == null) { - hdfsDirectory.close(); - throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]"); - } - - BlurConfiguration bc = new BlurConfiguration(); - BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc, - _totalNumberOfBytes); - _closer.register(blockCacheDirectoryFactoryV2); - Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null); - List<IndexCommit> listCommits = DirectoryReader.listCommits(dir); - IndexCommit indexCommit = MapperForExistingDataWithIndexLookup.findIndexCommit(listCommits, generation, shardPath); - DirectoryReader reader = DirectoryReader.open(indexCommit); - _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader)); - - Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName); - return new MergeSortRowIdMatcher(dir, generation, _configuration, cachePath, context); - } - - private long getTotalNumberOfRowIds(DirectoryReader reader) throws IOException { - long total = 0; - List<AtomicReaderContext> leaves = reader.leaves(); - for (AtomicReaderContext context : leaves) { - AtomicReader atomicReader = context.reader(); - Terms terms = atomicReader.terms(BlurConstants.ROW_ID); - long expectedInsertions = terms.size(); - if (expectedInsertions < 0) { - return -1; - } - total += expectedInsertions; - } - return total; - } -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java ---------------------------------------------------------------------- diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java deleted file mode 100644 index bf86e19..0000000 --- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.mapreduce.lib.update; - -import java.io.IOException; - -import org.apache.blur.mapreduce.lib.BlurRecord; -import org.apache.blur.mapreduce.lib.TableBlurRecord; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Mapper; - -public class MapperForExistingDataMod extends Mapper<Text, TableBlurRecord, IndexKey, IndexValue> { - - private Counter _existingRecords; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER); - counter.increment(1); - _existingRecords = context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS); - } - - @Override - protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException { - BlurRecord blurRecord = value.getBlurRecord(); - IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), blurRecord.getRecordId()); - context.write(oldDataKey, new IndexValue(blurRecord)); - _existingRecords.increment(1L); - } - -}