Yeah I think that's a good idea. Thanks Tim. Sent from my iPhone
On Feb 26, 2014, at 8:02 AM, Tim Williams <[email protected]> wrote: > On Sat, Feb 22, 2014 at 9:47 PM, <[email protected]> wrote: >> Adding a new feature to allow for shards to pull directly from a queue like >> interface. >> >> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo >> Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/052c131e >> Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/052c131e >> Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/052c131e >> >> Branch: refs/heads/apache-blur-0.2 >> Commit: 052c131e92e0caefb0c513fe52098ad6c6e04d3a >> Parents: 31f23a3 >> Author: Aaron McCurry <[email protected]> >> Authored: Sat Feb 22 21:47:25 2014 -0500 >> Committer: Aaron McCurry <[email protected]> >> Committed: Sat Feb 22 21:47:25 2014 -0500 >> >> ---------------------------------------------------------------------- >> .../org/apache/blur/manager/IndexManager.java | 49 +----- >> .../manager/writer/BlurIndexSimpleWriter.java | 5 +- >> .../blur/manager/writer/MutatableAction.java | 53 +++++++ >> .../apache/blur/manager/writer/QueueReader.java | 125 +++++++++++++++ >> .../org/apache/blur/server/TableContext.java | 33 ++++ >> .../writer/BlurIndexSimpleWriterTest.java | 3 - >> .../writer/QueueReaderBasicInMemory.java | 49 ++++++ >> .../blur/manager/writer/QueueReaderTest.java | 158 +++++++++++++++++++ >> .../org/apache/blur/utils/BlurConstants.java | 4 + >> 9 files changed, 429 insertions(+), 50 deletions(-) >> ---------------------------------------------------------------------- >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java >> ---------------------------------------------------------------------- >> diff --git >> a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java >> b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java >> index 093cd81..c8822d0 100644 >> --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java >> +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java >> @@ -460,7 +460,8 @@ public class IndexManager { >> } >> String rowId = blurQuery.getRowId(); >> if (rowId != null) { >> - // reduce the index selection down to the only one that would >> contain the row. >> + // reduce the index selection down to the only one that would >> contain >> + // the row. >> Map<String, BlurIndex> map = new HashMap<String, BlurIndex>(); >> String shard = MutationHelper.getShardName(table, rowId, >> getNumberOfShards(table), _blurPartitioner); >> BlurIndex index = getBlurIndex(table, shard); >> @@ -1207,25 +1208,7 @@ public class IndexManager { >> } >> ShardContext shardContext = blurIndex.getShardContext(); >> final MutatableAction mutatableAction = new >> MutatableAction(shardContext); >> - for (int i = 0; i < mutations.size(); i++) { >> - RowMutation mutation = mutations.get(i); >> - RowMutationType type = mutation.rowMutationType; >> - switch (type) { >> - case REPLACE_ROW: >> - Row row = MutationHelper.getRowFromMutations(mutation.rowId, >> mutation.recordMutations); >> - mutatableAction.replaceRow(row); >> - break; >> - case UPDATE_ROW: >> - doUpdateRowMutation(mutation, mutatableAction); >> - break; >> - case DELETE_ROW: >> - mutatableAction.deleteRow(mutation.rowId); >> - break; >> - default: >> - throw new RuntimeException("Not supported [" + type + "]"); >> - } >> - } >> - >> + mutatableAction.mutate(mutations); >> return _mutateExecutor.submit(new Callable<Void>() { >> @Override >> public Void call() throws Exception { >> @@ -1253,32 +1236,6 @@ public class IndexManager { >> return map; >> } >> >> - private void doUpdateRowMutation(RowMutation mutation, MutatableAction >> mutatableAction) throws BlurException, >> - IOException { >> - String rowId = mutation.getRowId(); >> - >> - for (RecordMutation recordMutation : mutation.getRecordMutations()) { >> - RecordMutationType type = recordMutation.recordMutationType; >> - Record record = recordMutation.getRecord(); >> - switch (type) { >> - case DELETE_ENTIRE_RECORD: >> - mutatableAction.deleteRecord(rowId, record.getRecordId()); >> - break; >> - case APPEND_COLUMN_VALUES: >> - mutatableAction.appendColumns(rowId, record); >> - break; >> - case REPLACE_ENTIRE_RECORD: >> - mutatableAction.replaceRecord(rowId, record); >> - break; >> - case REPLACE_COLUMNS: >> - mutatableAction.replaceColumns(rowId, record); >> - break; >> - default: >> - throw new RuntimeException("Unsupported record mutation type [" + >> type + "]"); >> - } >> - } >> - } >> - >> private int getNumberOfShards(String table) { >> return getTableContext(table).getDescriptor().getShardCount(); >> } >> >> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java >> ---------------------------------------------------------------------- >> diff --git >> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java >> >> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java >> index f9dc1c8..306479b 100644 >> --- >> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java >> +++ >> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java >> @@ -67,6 +67,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { >> private final AtomicReference<BlurIndexWriter> _writer = new >> AtomicReference<BlurIndexWriter>(); >> private final boolean _makeReaderExitable = true; >> private IndexImporter _indexImporter; >> + private QueueReader _queueReader; >> private final ReadWriteLock _lock = new ReentrantReadWriteLock(); >> private final Lock _writeLock = _lock.writeLock(); >> private final ReadWriteLock _indexRefreshLock = new >> ReentrantReadWriteLock(); >> @@ -134,6 +135,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { >> >> private Thread getWriterOpener(ShardContext shardContext) { >> Thread thread = new Thread(new Runnable() { >> + >> @Override >> public void run() { >> try { >> @@ -142,6 +144,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { >> _writer.notify(); >> } >> _indexImporter = new IndexImporter(BlurIndexSimpleWriter.this, >> _shardContext, TimeUnit.SECONDS, 10); >> + _queueReader = >> _tableContext.getQueueReader(BlurIndexSimpleWriter.this, _shardContext); >> } catch (IOException e) { >> LOG.error("Unknown error on index writer open.", e); >> } >> @@ -206,7 +209,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { >> @Override >> public void close() throws IOException { >> _isClosed.set(true); >> - IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get()); >> + IOUtils.cleanup(LOG, _indexImporter, _queueReader, _writer.get(), >> _indexReader.get()); >> } >> >> @Override >> >> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java >> ---------------------------------------------------------------------- >> diff --git >> a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java >> >> b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java >> index c8f0dfd..2a7159b 100644 >> --- >> a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java >> +++ >> b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java >> @@ -33,11 +33,16 @@ import org.apache.blur.manager.IndexManager; >> import org.apache.blur.server.IndexSearcherClosable; >> import org.apache.blur.server.ShardContext; >> import org.apache.blur.server.TableContext; >> +import org.apache.blur.thrift.MutationHelper; >> import org.apache.blur.thrift.generated.Column; >> import org.apache.blur.thrift.generated.FetchResult; >> import org.apache.blur.thrift.generated.FetchRowResult; >> import org.apache.blur.thrift.generated.Record; >> +import org.apache.blur.thrift.generated.RecordMutation; >> +import org.apache.blur.thrift.generated.RecordMutationType; >> import org.apache.blur.thrift.generated.Row; >> +import org.apache.blur.thrift.generated.RowMutation; >> +import org.apache.blur.thrift.generated.RowMutationType; >> import org.apache.blur.thrift.generated.Selector; >> import org.apache.blur.utils.BlurConstants; >> import org.apache.blur.utils.RowDocumentUtil; >> @@ -376,4 +381,52 @@ public class MutatableAction extends IndexAction { >> >> } >> >> + public void mutate(RowMutation mutation) { >> + RowMutationType type = mutation.rowMutationType; >> + switch (type) { >> + case REPLACE_ROW: >> + Row row = MutationHelper.getRowFromMutations(mutation.rowId, >> mutation.recordMutations); >> + replaceRow(row); >> + break; >> + case UPDATE_ROW: >> + doUpdateRowMutation(mutation, this); >> + break; >> + case DELETE_ROW: >> + deleteRow(mutation.rowId); >> + break; >> + default: >> + throw new RuntimeException("Not supported [" + type + "]"); >> + } >> + } >> + >> + private void doUpdateRowMutation(RowMutation mutation, MutatableAction >> mutatableAction) { >> + String rowId = mutation.getRowId(); >> + for (RecordMutation recordMutation : mutation.getRecordMutations()) { >> + RecordMutationType type = recordMutation.recordMutationType; >> + Record record = recordMutation.getRecord(); >> + switch (type) { >> + case DELETE_ENTIRE_RECORD: >> + mutatableAction.deleteRecord(rowId, record.getRecordId()); >> + break; >> + case APPEND_COLUMN_VALUES: >> + mutatableAction.appendColumns(rowId, record); >> + break; >> + case REPLACE_ENTIRE_RECORD: >> + mutatableAction.replaceRecord(rowId, record); >> + break; >> + case REPLACE_COLUMNS: >> + mutatableAction.replaceColumns(rowId, record); >> + break; >> + default: >> + throw new RuntimeException("Unsupported record mutation type [" + >> type + "]"); >> + } >> + } >> + } >> + >> + public void mutate(List<RowMutation> mutations) { >> + for (int i = 0; i < mutations.size(); i++) { >> + mutate(mutations.get(i)); >> + } >> + } >> + >> } >> >> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/052c131e/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java >> ---------------------------------------------------------------------- >> diff --git >> a/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java >> b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java >> new file mode 100644 >> index 0000000..f44dea9 >> --- /dev/null >> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java >> @@ -0,0 +1,125 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version 2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.blur.manager.writer; >> + >> +import static >> org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF; >> +import static >> org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_MAX; >> + >> +import java.io.Closeable; >> +import java.io.IOException; >> +import java.util.ArrayList; >> +import java.util.List; >> +import java.util.concurrent.atomic.AtomicBoolean; >> + >> +import org.apache.blur.BlurConfiguration; >> +import org.apache.blur.log.Log; >> +import org.apache.blur.log.LogFactory; >> +import org.apache.blur.server.ShardContext; >> +import org.apache.blur.server.TableContext; >> +import org.apache.blur.thrift.generated.RowMutation; >> + >> +public abstract class QueueReader implements Closeable, Runnable { >> + >> + private static final Log LOG = LogFactory.getLog(QueueReader.class); >> + >> + protected final ShardContext _shardContext; >> + protected final BlurIndex _index; >> + protected final long _backOff; >> + protected final Thread _daemon; >> + protected final AtomicBoolean _running = new AtomicBoolean(); >> + protected final int _max; >> + protected final TableContext _tableContext; >> + >> + public QueueReader(BlurIndex index, ShardContext shardContext) { >> + _running.set(true); >> + _index = index; >> + _shardContext = shardContext; >> + _tableContext = _shardContext.getTableContext(); >> + BlurConfiguration configuration = _tableContext.getBlurConfiguration(); >> + _backOff = configuration.getLong(BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF, >> 500); >> + _max = configuration.getInt(BLUR_SHARD_INDEX_QUEUE_READER_MAX, 500); >> + _daemon = new Thread(this); >> + _daemon.setName("Queue Loader for [" + _tableContext.getTable() + "/" + >> shardContext.getShard() + "]"); >> + _daemon.setDaemon(true); >> + _daemon.start(); >> + } > > Hmm... there's a timing issue here. Any subclasses that implement > queue-like things are likely to have costly initialization work to do. > The problem is that this thread starts [or could start] "taking" > before the subclass' constructor has completed its initialization. If > we keep this, then every implementation would be forced to create a > block on the initial take. > > We could move the runnable to an inner class, but that'd force callers > to call 'start' or 'listen' or somesuch, which we could use to start > the thread... thoughts? > > Thanks, > --tim
