http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java new file mode 100644 index 0000000..48a2ca9 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.distributed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * Holds a ordered collection of entries matching a search query. + * + * @param <K> the type of key + */ +public class TopEntries<K> implements DataSerializableFixedID { + // ordered collection of entries + private List<EntryScore<K>> hits = new ArrayList<>(); + + // the maximum number of entries stored in this + private int limit; + + // comparator to order entryScore instances + final Comparator<EntryScore<K>> comparator = new EntryScoreComparator(); + + public TopEntries() { + this(LuceneQueryFactory.DEFAULT_LIMIT); + } + + public TopEntries(int limit) { + if (limit < 0) { + throw new IllegalArgumentException(); + } + this.limit = limit; + } + + /** + * Adds an entry to the collection. The new entry must have a lower score than all previous entries added to the + * collection. The new entry will be ignored if the limit is already reached. + * + * @param entry + */ + public void addHit(EntryScore<K> entry) { + if (hits.size() > 0) { + EntryScore lastEntry = hits.get(hits.size() - 1); + if (comparator.compare(lastEntry, entry) < 0) { + throw new IllegalArgumentException(); + } + } + + if (hits.size() >= limit) { + return; + } + + hits.add(entry); + } + + /** + * @return count of entries in the collection + */ + public int size() { + return hits.size(); + } + + /** + * @return The entries collection managed by this instance + */ + public List<EntryScore<K>> getHits() { + return hits; + } + + /** + * @return The maximum capacity of this collection + */ + public int getLimit() { + return limit; + } + + /** + * Compares scores of two entries using natural ordering. I.e. it returns -1 if the first entry's score is less than + * the second one. + */ + class EntryScoreComparator implements Comparator<EntryScore<K>> { + @Override + public int compare(EntryScore<K> o1, EntryScore<K> o2) { + return Float.compare(o1.getScore(), o2.getScore()); + } + } + + @Override + public Version[] getSerializationVersions() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getDSFID() { + return LUCENE_TOP_ENTRIES; + } + + @Override + public void toData(DataOutput out) throws IOException { + out.writeInt(limit); + DataSerializer.writeObject(hits, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + limit = in.readInt(); + hits = DataSerializer.readObject(in); + }; +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java new file mode 100644 index 0000000..94b8a3a --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.distributed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * An implementation of {@link IndexResultCollector} to collect {@link EntryScore}. It is expected that the results will + * be ordered by score of the entry. + */ +public class TopEntriesCollector implements IndexResultCollector, DataSerializableFixedID { + private String name; + + private TopEntries entries; + + public TopEntriesCollector() { + this(null); + } + + public TopEntriesCollector(String name) { + this(name, LuceneQueryFactory.DEFAULT_LIMIT); + } + + public TopEntriesCollector(String name, int limit) { + this.name = name; + this.entries = new TopEntries(limit); + } + + @Override + public void collect(Object key, float score) { + collect(new EntryScore(key, score)); + } + + public void collect(EntryScore entry) { + entries.addHit(entry); + } + + @Override + public String getName() { + return name; + } + + @Override + public int size() { + TopEntries entries = getEntries(); + return entries == null ? 0 : entries.size(); + } + + /** + * @return The entries collected by this collector + */ + public TopEntries getEntries() { + return entries; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + + @Override + public int getDSFID() { + return LUCENE_TOP_ENTRIES_COLLECTOR; + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeString(name, out); + DataSerializer.writeObject(entries, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + name = DataSerializer.readString(in); + entries = DataSerializer.readObject(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java new file mode 100644 index 0000000..cf6e420 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.distributed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory; +import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntries.EntryScoreComparator; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to + * collect top matching entries from local buckets + */ +public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCollector>, DataSerializableFixedID { + private static final Logger logger = LogService.getLogger(); + + private int limit; + private String id; + + public TopEntriesCollectorManager() { + this(null, 0); + } + + public TopEntriesCollectorManager(String id) { + this(id, 0); + } + + public TopEntriesCollectorManager(String id, int resultLimit) { + this.limit = resultLimit <= 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit; + this.id = id == null ? String.valueOf(this.hashCode()) : id; + logger.debug("Max count of entries to be produced by {} is {}", id, limit); + } + + @Override + public TopEntriesCollector newCollector(String name) { + return new TopEntriesCollector(name, limit); + } + + @Override + public TopEntriesCollector reduce(Collection<TopEntriesCollector> collectors) { + TopEntriesCollector mergedResult = new TopEntriesCollector(id, limit); + if (collectors.isEmpty()) { + return mergedResult; + } + + final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator(); + + // orders a entry with higher score above a doc with lower score + Comparator<ListScanner> entryListComparator = new Comparator<ListScanner>() { + @Override + public int compare(ListScanner l1, ListScanner l2) { + EntryScore o1 = l1.peek(); + EntryScore o2 = l2.peek(); + return scoreComparator.compare(o1, o2); + } + }; + + // The queue contains iterators for all bucket results. The queue puts the entry with the highest score at the head + // using score comparator. + PriorityQueue<ListScanner> entryListsPriorityQueue; + entryListsPriorityQueue = new PriorityQueue<ListScanner>(collectors.size(), + Collections.reverseOrder(entryListComparator)); + + for (IndexResultCollector collector : collectors) { + logger.debug("Number of entries found in collector {} is {}", collector.getName(), collector.size()); + + if (collector.size() > 0) { + entryListsPriorityQueue.add(new ListScanner(((TopEntriesCollector) collector).getEntries().getHits())); + } + } + + logger.debug("Only {} count of entries will be reduced. Other entries will be ignored", limit); + while (entryListsPriorityQueue.size() > 0 && limit > mergedResult.size()) { + + ListScanner scanner = entryListsPriorityQueue.remove(); + EntryScore entry = scanner.next(); + mergedResult.collect(entry); + + if (scanner.hasNext()) { + entryListsPriorityQueue.add(scanner); + } + } + + logger.debug("Reduced size of {} is {}", mergedResult.getName(), mergedResult.size()); + return mergedResult; + } + + /* + * Utility class to iterate on hits without modifying it + */ + static class ListScanner { + private List<EntryScore> hits; + private int index = 0; + + ListScanner(List<EntryScore> hits) { + this.hits = hits; + } + + boolean hasNext() { + return index < hits.size(); + } + + EntryScore peek() { + return hits.get(index); + } + + EntryScore next() { + return hits.get(index++); + } + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + + @Override + public int getDSFID() { + return LUCENE_TOP_ENTRIES_COLLECTOR_MANAGER; + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeString(id, out); + out.writeInt(limit); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + id = DataSerializer.readString(in); + limit = in.readInt(); + } + + /** + * @return Id of this collector, if any + */ + public String getId() { + return id; + } + + /** + * @return Result limit enforced by the collectors created by this manager + */ + public int getLimit() { + return limit; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java new file mode 100644 index 0000000..4a99bf8 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.distributed; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.cache.execute.ResultCollector; +import com.gemstone.gemfire.cache.lucene.LuceneQuery; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * A {@link ResultCollector} implementation for collecting and ordering {@link TopEntries}. The {@link TopEntries} + * objects will be created by members when a {@link LuceneQuery} is executed on the local data hosted by the member. The + * member executing this logic must have sufficient space to hold all the {@link EntryScore} documents returned from the + * members. + * + * <p> + * This class will perform a lazy merge operation. Merge will take place if the merge {@link ResultCollector#getResult} + * is invoked or if the combined result size is more than the limit set. In the later case, merge will be performed + * whenever {@link ResultCollector#addResult} is invoked. + */ +public class TopEntriesFunctionCollector implements ResultCollector<TopEntriesCollector, TopEntries> { + // Use this instance to perform reduce operation + final CollectorManager<TopEntriesCollector> manager; + + // latch to wait till all results are collected + private final CountDownLatch waitForResults = new CountDownLatch(1); + + final String id; + + // Instance of gemfire cache to check status and other utility methods + final private GemFireCacheImpl cache; + private static final Logger logger = LogService.getLogger(); + + private final Collection<TopEntriesCollector> subResults = new ArrayList<>(); + private TopEntriesCollector mergedResults; + + public TopEntriesFunctionCollector() { + this(null); + } + + public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context) { + this(context, null); + } + + public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context, GemFireCacheImpl cache) { + this.cache = cache; + id = cache == null ? String.valueOf(this.hashCode()) : cache.getName(); + + int limit = context == null ? 0 : context.getLimit(); + + if (context != null && context.getCollectorManager() != null) { + this.manager = context.getCollectorManager(); + } else { + this.manager = new TopEntriesCollectorManager(id, limit); + } + } + + @Override + public TopEntries getResult() throws FunctionException { + try { + waitForResults.await(); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for result collection", e); + Thread.currentThread().interrupt(); + if (cache != null) { + cache.getCancelCriterion().checkCancelInProgress(e); + } + throw new FunctionException(e); + } + + return aggregateResults(); + } + + @Override + public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException { + try { + boolean result = waitForResults.await(timeout, unit); + if (!result) { + throw new FunctionException("Did not receive results from all members within wait time"); + } + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for result collection", e); + Thread.currentThread().interrupt(); + if (cache != null) { + cache.getCancelCriterion().checkCancelInProgress(e); + } + throw new FunctionException(e); + } + + return aggregateResults(); + } + + private TopEntries aggregateResults() { + synchronized (subResults) { + if (mergedResults != null) { + return mergedResults.getEntries(); + } + + mergedResults = manager.reduce(subResults); + return mergedResults.getEntries(); + } + } + + @Override + public void endResults() { + synchronized (subResults) { + waitForResults.countDown(); + } + } + + @Override + public void clearResults() { + synchronized (subResults) { + if (waitForResults.getCount() == 0) { + throw new IllegalStateException("This collector is closed and cannot accept anymore results"); + } + + subResults.clear(); + } + } + + @Override + public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution) { + synchronized (subResults) { + if (waitForResults.getCount() == 0) { + throw new IllegalStateException("This collector is closed and cannot accept anymore results"); + } + subResults.add(resultOfSingleExecution); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java new file mode 100644 index 0000000..4079ad4 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Classes used for distributing lucene queries to geode nodes. Contains the lucene related functions + * like {@link com.gemstone.gemfire.cache.lucene.internal.distributed.LuceneFunction} as well as objects that are + * passed between nodes like {@link com.gemstone.gemfire.cache.lucene.internal.distributed.EntryScore} + */ + +package com.gemstone.gemfire.cache.lucene.internal.distributed; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java new file mode 100644 index 0000000..8fbe356 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.UUID; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * The key for a single chunk on a file stored within a region. + */ +public class ChunkKey implements DataSerializableFixedID { + UUID fileId; + int chunkId; + + /** + * Constructor used for serialization only. + */ + public ChunkKey() { + } + + ChunkKey(UUID fileName, int chunkId) { + this.fileId = fileName; + this.chunkId = chunkId; + } + + /** + * @return the fileName + */ + public UUID getFileId() { + return fileId; + } + + /** + * @return the chunkId + */ + public int getChunkId() { + return chunkId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + fileId.hashCode(); + result = prime * result + chunkId; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ChunkKey)) { + return false; + } + ChunkKey other = (ChunkKey) obj; + if (chunkId != other.chunkId) { + return false; + } + if (fileId == null) { + if (other.fileId != null) { + return false; + } + } else if (!fileId.equals(other.fileId)) { + return false; + } + return true; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + + @Override + public int getDSFID() { + return LUCENE_CHUNK_KEY; + } + + @Override + public void toData(DataOutput out) throws IOException { + out.writeInt(chunkId); + out.writeLong(fileId.getMostSignificantBits()); + out.writeLong(fileId.getLeastSignificantBits()); + } + + @Override + public void fromData(DataInput in) + throws IOException, ClassNotFoundException { + chunkId = in.readInt(); + long high = in.readLong(); + long low = in.readLong(); + fileId = new UUID(high, low); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java new file mode 100644 index 0000000..d27717e --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.file.Files; +import java.util.UUID; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * A file that is stored in a gemfire region. + */ +public class File implements DataSerializableFixedID { + + private transient FileSystem fileSystem; + private transient int chunkSize; + + private String name; + long length = 0; + int chunks = 0; + long created = System.currentTimeMillis(); + long modified = created; + UUID id = UUID.randomUUID(); + + /** + * Constructor for serialization only + */ + public File() { + } + + File(final FileSystem fileSystem, final String name) { + setFileSystem(fileSystem); + + this.name = name; + } + + /** + * @return the name + */ + public String getName() { + return name; + } + + /** + * @return the length + */ + public long getLength() { + return length; + } + + /** + * @return the created + */ + public long getCreated() { + return created; + } + + /** + * @return the modified + */ + public long getModified() { + return modified; + } + + /** + * Get an input stream that reads from the beginning the file + * + * The input stream is not threadsafe + */ + public SeekableInputStream getInputStream() { + // TODO get read lock? + return new FileInputStream(this); + } + + /** + * Get an output stream that appends to the end + * of the file. + */ + public OutputStream getOutputStream() { + return new FileOutputStream(this); + } + + void setFileSystem(final FileSystem fileSystem) { + this.fileSystem = fileSystem; + this.chunkSize = FileSystem.CHUNK_SIZE; + } + + int getChunkSize() { + return chunkSize; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + + @Override + public int getDSFID() { + return LUCENE_FILE; + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeString(name, out); + out.writeLong(length); + out.writeInt(chunks); + out.writeLong(created); + out.writeLong(modified); + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + } + + @Override + public void fromData(DataInput in) + throws IOException, ClassNotFoundException { + name = DataSerializer.readString(in); + length = in.readLong(); + chunks = in.readInt(); + created = in.readLong(); + modified = in.readLong(); + long high = in.readLong(); + long low = in.readLong(); + id = new UUID(high, low); + } + + + /** + * Export this to a {@link java.io.File} + */ + public void export(final java.io.File exportLocation) + { + java.io.File targetFile = new java.io.File(exportLocation, getName()); + try { + Files.copy(getInputStream(), targetFile.toPath()); + } + catch (IOException e) { + throw new InternalGemFireError("Could not export file " + getName(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java new file mode 100644 index 0000000..18194aa --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.io.EOFException; +import java.io.IOException; + +/** + * An input stream that reads chunks from + * a File saved in the region. This input stream + * will keep going back to the region to look for + * chunks until nothing is found. + */ +final class FileInputStream extends SeekableInputStream { + + private final File file; + private byte[] chunk = null; + private int chunkPosition = 0; + private int chunkId = 0; + private boolean open = true; + + public FileInputStream(File file) { + this.file = file; + nextChunk(); + } + + public FileInputStream(FileInputStream other) { + this.file = other.file; + this.chunk = other.chunk; + this.chunkId = other.chunkId; + this.chunkPosition = other.chunkPosition; + this.open = other.open; + } + + @Override + public int read() throws IOException { + assertOpen(); + + checkAndFetchNextChunk(); + + if (null == chunk) { + return -1; + } + + return chunk[chunkPosition++] & 0xff; + } + + @Override + public void seek(long position) throws IOException { + if(position > file.length) { + throw new EOFException(); + } + int targetChunk = (int) (position / file.getChunkSize()); + int targetPosition = (int) (position % file.getChunkSize()); + + if(targetChunk != (this.chunkId - 1)) { + chunk = file.getFileSystem().getChunk(this.file, targetChunk); + chunkId = targetChunk + 1; + chunkPosition = targetPosition; + } else { + chunkPosition = targetPosition; + } + } + + + + @Override + public long skip(long n) throws IOException { + int currentPosition = (chunkId - 1) * file.getChunkSize() + chunkPosition; + seek(currentPosition + n); + return n; + } + + @Override + public void reset() throws IOException { + seek(0); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + assertOpen(); + + checkAndFetchNextChunk(); + + if (null == chunk) { + return -1; + } + + int read = 0; + while (len > 0) { + final int min = Math.min(remaining(), len); + System.arraycopy(chunk, chunkPosition, b, off, min); + off += min; + len -= min; + chunkPosition += min; + read += min; + + if (len > 0) { + // we read to the end of the chunk, fetch another. + nextChunk(); + if (null == chunk) { + break; + } + } + } + + return read; + } + + @Override + public int available() throws IOException { + assertOpen(); + + return remaining(); + } + + @Override + public void close() throws IOException { + if (open) { + open = false; + } + } + + private int remaining() { + return chunk.length - chunkPosition; + } + + private void checkAndFetchNextChunk() { + if (null != chunk && remaining() <= 0) { + nextChunk(); + } + } + + private void nextChunk() { + chunk = file.getFileSystem().getChunk(this.file, chunkId++); + chunkPosition = 0; + } + + private void assertOpen() throws IOException { + if (!open) { + throw new IOException("Closed"); + } + } + + @Override + public FileInputStream clone() { + return new FileInputStream(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java new file mode 100644 index 0000000..3f9f614 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +final class FileOutputStream extends OutputStream { + + private final File file; + private ByteBuffer buffer; + private boolean open = true; + private long length; + private int chunks; + + public FileOutputStream(final File file) { + this.file = file; + buffer = ByteBuffer.allocate(file.getChunkSize()); + this.length = file.length; + this.chunks = file.chunks; + if(chunks > 0 && file.length % file.getChunkSize() != 0) { + //If the last chunk was incomplete, we're going to update it + //rather than add a new chunk. This guarantees that all chunks + //are full except for the last chunk. + chunks--; + byte[] previousChunkData = file.getFileSystem().getChunk(file, chunks); + buffer.put(previousChunkData); + } + } + + @Override + public void write(final int b) throws IOException { + assertOpen(); + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + buffer.put((byte) b); + length++; + } + + @Override + public void write(final byte[] b, int off, int len) throws IOException { + assertOpen(); + + while (len > 0) { + if (buffer.remaining() == 0) { + flushBuffer(); + } + + final int min = Math.min(buffer.remaining(), len); + buffer.put(b, off, min); + off += min; + len -= min; + length += min; + } + } + + @Override + public void close() throws IOException { + if (open) { + flushBuffer(); + file.modified = System.currentTimeMillis(); + file.length = length; + file.chunks = chunks; + file.getFileSystem().updateFile(file); + open = false; + buffer = null; + } + } + + private void flushBuffer() { + byte[] chunk = Arrays.copyOfRange(buffer.array(), buffer.arrayOffset(), buffer.position()); + file.getFileSystem().putChunk(file, chunks++, chunk); + buffer.rewind(); + } + + private void assertOpen() throws IOException { + if (!open) { + throw new IOException("Closed"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java new file mode 100644 index 0000000..5f4fb77 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; + +/** + * A Filesystem like interface that stores file data in geode regions. + * + * This filesystem is safe for use with multiple threads if the threads are not + * modifying the same files. A single file is not safe to modify by multiple + * threads, even between different members of the distributed system. + * + * Changes to a file may not be visible to other members of the system until the + * FileOutputStream is closed. + * + */ +public class FileSystem { + // private final Cache cache; + private final ConcurrentMap<String, File> fileRegion; + private final ConcurrentMap<ChunkKey, byte[]> chunkRegion; + + static final int CHUNK_SIZE = 1024 * 1024; //1 MB + private final FileSystemStats stats; + + /** + * Create filesystem that will store data in the two provided regions. The fileRegion contains + * metadata about the files, and the chunkRegion contains the actual data. If data from either region is missing + * or inconsistent, no guarantees are made about what this class will do, so it's best if these regions are colocated + * and in the same disk store to ensure the data remains together. + * @param fileRegion the region to store metadata about the files + * @param chunkRegion the region to store actual file data. + */ + public FileSystem(ConcurrentMap<String, File> fileRegion, ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) { + this.fileRegion = fileRegion; + this.chunkRegion = chunkRegion; + this.stats = stats; + } + + public Collection<String> listFileNames() { + return fileRegion.keySet(); + } + + public File createFile(final String name) throws IOException { + // TODO lock region ? + final File file = new File(this, name); + if (null != fileRegion.putIfAbsent(name, file)) { + throw new IOException("File exists."); + } + stats.incFileCreates(1); + // TODO unlock region ? + return file; + } + + public File createTemporaryFile(final String name) throws IOException { + final File file = new File(this, name); + stats.incTemporaryFileCreates(1); + return file; + } + + public File getFile(final String name) throws FileNotFoundException { + final File file = fileRegion.get(name); + + if (null == file) { + throw new FileNotFoundException(name); + } + + file.setFileSystem(this); + return file; + } + + public void deleteFile(final String name) throws FileNotFoundException { + // TODO locks? + + // TODO - What is the state of the system if + // things crash in the middle of removing this file? + // Seems like a file will be left with some + // dangling chunks at the end of the file + File file = fileRegion.remove(name); + if(file == null) { + throw new FileNotFoundException(name); + } + + // TODO consider removeAll with all ChunkKeys listed. + final ChunkKey key = new ChunkKey(file.id, 0); + while (true) { + // TODO consider mutable ChunkKey + if (null == chunkRegion.remove(key)) { + // no more chunks + break; + } + key.chunkId++; + } + + stats.incFileDeletes(1); + } + + public void renameFile(String source, String dest) throws IOException { + final File sourceFile = fileRegion.get(source); + if (null == sourceFile) { + throw new FileNotFoundException(source); + } + + final File destFile = createFile(dest); + + destFile.chunks = sourceFile.chunks; + destFile.created = sourceFile.created; + destFile.length = sourceFile.length; + destFile.modified = sourceFile.modified; + destFile.id = sourceFile.id; + updateFile(destFile); + + // TODO - What is the state of the system if + // things crash in the middle of moving this file? + // Seems like we will have two files pointing + // at the same data + + fileRegion.remove(source); + + stats.incFileRenames(1); + } + + byte[] getChunk(final File file, final int id) { + final ChunkKey key = new ChunkKey(file.id, id); + + //The file's metadata indicates that this chunk shouldn't + //exist. Purge all of the chunks that are larger than the file metadata + if(id >= file.chunks) { + while(chunkRegion.containsKey(key)) { + chunkRegion.remove(key); + key.chunkId++; + } + + return null; + } + + final byte[] chunk = chunkRegion.get(key); + stats.incReadBytes(chunk.length); + return chunk; + } + + public void putChunk(final File file, final int id, final byte[] chunk) { + final ChunkKey key = new ChunkKey(file.id, id); + chunkRegion.put(key, chunk); + stats.incWrittenBytes(chunk.length); + } + + void updateFile(File file) { + fileRegion.put(file.getName(), file); + } + + public ConcurrentMap<String, File> getFileRegion() { + return fileRegion; + } + + public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() { + return chunkRegion; + } + + /** + * Export all of the files in the filesystem to the provided directory + */ + public void export(final java.io.File exportLocation) { + + listFileNames().stream().forEach(fileName-> { + try { + getFile(fileName).export(exportLocation); + } + catch (FileNotFoundException e) { + //ignore this, it was concurrently removed + } + + }); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java new file mode 100644 index 0000000..e6bbf0d --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.util.function.IntSupplier; +import java.util.function.LongSupplier; + +import com.gemstone.gemfire.StatisticDescriptor; +import com.gemstone.gemfire.Statistics; +import com.gemstone.gemfire.StatisticsFactory; +import com.gemstone.gemfire.StatisticsType; +import com.gemstone.gemfire.StatisticsTypeFactory; +import com.gemstone.gemfire.internal.statistics.StatisticsTypeFactoryImpl; + +public class FileSystemStats { + private static final StatisticsType statsType; + private static final String statsTypeName = "FileSystemStats"; + private static final String statsTypeDescription = "Statistics about in memory file system implementation"; + + private final Statistics stats; + + private static final int readBytesId; + private static final int writtenBytesId; + private static final int fileCreatesId; + private static final int temporaryFileCreatesId; + private static final int fileDeletesId; + private static final int fileRenamesId; + private static final int filesId; + private static final int chunksId; + private static final int bytesId; + + static { + final StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton(); + statsType = f.createType( + statsTypeName, + statsTypeDescription, + new StatisticDescriptor[] { + f.createLongCounter("readBytes", "Number of bytes written", "bytes"), + f.createLongCounter("writtenBytes", "Number of bytes read", "bytes"), + f.createIntCounter("fileCreates", "Number of files created", "files"), + f.createIntCounter("temporaryFileCreates", "Number of temporary files created", "files"), + f.createIntCounter("fileDeletes", "Number of files deleted", "files"), + f.createIntCounter("fileRenames", "Number of files renamed", "files"), + f.createIntGauge("files", "Number of files on this member", "files"), + f.createIntGauge("chunks", "Number of file chunks on this member", "chunks"), + f.createLongGauge("bytes", "Number of bytes on this member", "bytes"), + } + ); + + readBytesId = statsType.nameToId("readBytes"); + writtenBytesId = statsType.nameToId("writtenBytes"); + fileCreatesId = statsType.nameToId("fileCreates"); + temporaryFileCreatesId = statsType.nameToId("temporaryFileCreates"); + fileDeletesId = statsType.nameToId("fileDeletes"); + fileRenamesId = statsType.nameToId("fileRenames"); + filesId = statsType.nameToId("files"); + chunksId = statsType.nameToId("chunks"); + bytesId = statsType.nameToId("bytes"); + } + + public FileSystemStats(StatisticsFactory f, String name) { + this.stats = f.createAtomicStatistics(statsType, name); + } + + public void incReadBytes(int delta) { + stats.incLong(readBytesId, delta); + } + + public void incWrittenBytes(int delta) { + stats.incLong(writtenBytesId, delta); + } + + public void incFileCreates(final int delta) { + stats.incInt(fileCreatesId,delta); + } + + public void incTemporaryFileCreates(final int delta) { + stats.incInt(temporaryFileCreatesId, delta); + } + + public void incFileDeletes(final int delta) { + stats.incInt(fileDeletesId,delta); + } + + public void incFileRenames(final int delta) { + stats.incInt(fileRenamesId,delta); + } + + public void setFileSupplier(IntSupplier supplier) { + stats.setIntSupplier(filesId, supplier); + } + + public int getFiles() { + return stats.getInt(filesId); + } + + public void setChunkSupplier(IntSupplier supplier) { + stats.setIntSupplier(chunksId, supplier); + } + + public int getChunks() { + return stats.getInt(chunksId); + } + + public void setBytesSupplier(LongSupplier supplier) { + stats.setLongSupplier(bytesId, supplier); + } + + public long getBytes() { + return stats.getLong(bytesId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java new file mode 100644 index 0000000..e10e0c4 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An input stream that supports seeking to a particular position. + */ +public abstract class SeekableInputStream extends InputStream { + + /** + * Seek to a position in the stream. The position is relative to the beginning + * of the stream (in other words, just before the first byte that was ever + * read). + * + * @param position + * @throws IOException if the seek goes past the end of the stream + */ + public abstract void seek(long position) throws IOException; + + public abstract SeekableInputStream clone(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java new file mode 100644 index 0000000..f8b612c --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * A distributed filesystem implementation that uses a geode region as the underlying storage mechanism. + * + * Users of this filesystem should interact with the {@link com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystem} class. + * + */ + +package com.gemstone.gemfire.cache.lucene.internal.filesystem; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java new file mode 100644 index 0000000..ba9f73b --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +import java.beans.ConstructorProperties; + +public class LuceneIndexMetrics { + + private final String regionPath; + + private final String indexName; + + private final int queryExecutions; + + private final long queryExecutionTime; + + private final float queryRate; + + private final long queryRateAverageLatency; + + private final int queryExecutionsInProgress; + + private final long queryExecutionTotalHits; + + private final int updates; + + private final long updateTime; + + private final float updateRate; + + private final long updateRateAverageLatency; + + private final int updatesInProgress; + + private final int commits; + + private final long commitTime; + + private final float commitRate; + + private final long commitRateAverageLatency; + + private final int commitsInProgress; + + private final int documents; + + /** + * This constructor is to be used by internal JMX framework only. A user should + * not try to create an instance of this class. + */ + @ConstructorProperties( { "regionPath", "indexName", "queryExecutions", "queryExecutionTime", "queryRate", + "queryRateAverageLatency", "queryExecutionsInProgress", "queryExecutionTotalHits", "updates", + "updateTime", "updateRate", "updateRateAverageLatency", "updatesInProgress", "commits", + "commitTime", "commitRate", "commitRateAverageLatency", "commitsInProgress", "documents" + }) + public LuceneIndexMetrics(String regionPath, String indexName, int queryExecutions, long queryExecutionTime, + float queryRate, long queryRateAverageLatency, int queryExecutionsInProgress, long queryExecutionTotalHits, + int updates, long updateTime, float updateRate, long updateRateAverageLatency, int updatesInProgress, + int commits, long commitTime, float commitRate, long commitRateAverageLatency, int commitsInProgress, + int documents) { + this.regionPath = regionPath; + this.indexName = indexName; + this.queryExecutions = queryExecutions; + this.queryExecutionTime = queryExecutionTime; + this.queryRate = queryRate; + this.queryRateAverageLatency = queryRateAverageLatency; + this.queryExecutionsInProgress = queryExecutionsInProgress; + this.queryExecutionTotalHits = queryExecutionTotalHits; + this.updates = updates; + this.updateTime = updateTime; + this.updateRate = updateRate; + this.updateRateAverageLatency = updateRateAverageLatency; + this.updatesInProgress = updatesInProgress; + this.commits = commits; + this.commitTime = commitTime; + this.commitRate = commitRate; + this.commitRateAverageLatency = commitRateAverageLatency; + this.commitsInProgress = commitsInProgress; + this.documents = documents; + } + + public String getRegionPath() { + return this.regionPath; + } + + public String getIndexName() { + return this.indexName; + } + + public int getQueryExecutions() { + return this.queryExecutions; + } + + public long getQueryExecutionTime() { + return this.queryExecutionTime; + } + + public float getQueryRate() { + return this.queryRate; + } + + public long getQueryRateAverageLatency() { + return this.queryRateAverageLatency; + } + + public int getQueryExecutionsInProgress() { + return this.queryExecutionsInProgress; + } + + public long getQueryExecutionTotalHits() { + return this.queryExecutionTotalHits; + } + + public int getUpdates() { + return this.updates; + } + + public long getUpdateTime() { + return this.updateTime; + } + + public float getUpdateRate() { + return this.updateRate; + } + + public long getUpdateRateAverageLatency() { + return this.updateRateAverageLatency; + } + + public int getUpdatesInProgress() { + return this.updatesInProgress; + } + + public int getCommits() { + return this.commits; + } + + public long getCommitTime() { + return this.commitTime; + } + + public float getCommitRate() { + return this.commitRate; + } + + public long getCommitRateAverageLatency() { + return this.commitRateAverageLatency; + } + + public int getCommitsInProgress() { + return this.commitsInProgress; + } + + public int getDocuments() { + return documents; + } + + @Override + public String toString() { + return new StringBuilder() + .append(getClass().getSimpleName()) + .append("[") + .append("regionPath=") + .append(this.regionPath) + .append("; indexName=") + .append(this.indexName) + .append("; queryExecutions=") + .append(this.queryExecutions) + .append("; queryExecutionTime=") + .append(this.queryExecutionTime) + .append("; queryRate=") + .append(this.queryRate) + .append("; queryRateAverageLatency=") + .append(this.queryRateAverageLatency) + .append("; queryExecutionsInProgress=") + .append(this.queryExecutionsInProgress) + .append("; queryExecutionTotalHits=") + .append(this.queryExecutionTotalHits) + .append("; updates=") + .append(this.updates) + .append("; updateTime=") + .append(this.updateTime) + .append("; updateRate=") + .append(this.updateRate) + .append("; updateRateAverageLatency=") + .append(this.updateRateAverageLatency) + .append("; updatesInProgress=") + .append(this.updatesInProgress) + .append("; commits=") + .append(this.commits) + .append("; commitTime=") + .append(this.commitTime) + .append("; commitRate=") + .append(this.commitRate) + .append("; commitRateAverageLatency=") + .append(this.commitRateAverageLatency) + .append("; commitsInProgress=") + .append(this.commitsInProgress) + .append("; documents=") + .append(this.documents) + .append("]") + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java new file mode 100644 index 0000000..30a2659 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl; +import com.gemstone.gemfire.management.internal.ManagementStrings; +import com.gemstone.gemfire.management.internal.beans.stats.MBeanStatsMonitor; +import com.gemstone.gemfire.management.internal.beans.stats.StatType; +import com.gemstone.gemfire.management.internal.beans.stats.StatsAverageLatency; +import com.gemstone.gemfire.management.internal.beans.stats.StatsRate; + +public class LuceneIndexStatsMonitor extends MBeanStatsMonitor { + + private StatsRate updateRate; + + private StatsAverageLatency updateRateAverageLatency; + + private StatsRate commitRate; + + private StatsAverageLatency commitRateAverageLatency; + + private StatsRate queryRate; + + private StatsAverageLatency queryRateAverageLatency; + + public static final String LUCENE_SERVICE_MXBEAN_MONITOR_PREFIX = "LuceneServiceMXBeanMonitor_"; + + public LuceneIndexStatsMonitor(LuceneIndex index) { + super(LUCENE_SERVICE_MXBEAN_MONITOR_PREFIX + index.getRegionPath() + "_" + index.getName()); + addStatisticsToMonitor(((LuceneIndexImpl) index).getIndexStats().getStats()); + configureMetrics(); + } + + private void configureMetrics() { + this.queryRate = new StatsRate(StatsKey.QUERIES, StatType.INT_TYPE, this); + + this.updateRate = new StatsRate(StatsKey.UPDATES, StatType.INT_TYPE, this); + + this.commitRate = new StatsRate(StatsKey.COMMITS, StatType.INT_TYPE, this); + + this.queryRateAverageLatency = new StatsAverageLatency( + StatsKey.QUERIES, StatType.INT_TYPE, StatsKey.QUERY_TIME, this); + + this.updateRateAverageLatency = new StatsAverageLatency( + StatsKey.UPDATES, StatType.INT_TYPE, StatsKey.UPDATE_TIME, this); + + this.commitRateAverageLatency = new StatsAverageLatency( + StatsKey.COMMITS, StatType.INT_TYPE, StatsKey.COMMIT_TIME, this); + } + + protected LuceneIndexMetrics getIndexMetrics(LuceneIndex index) { + int queryExecutions = getStatistic(StatsKey.QUERIES).intValue(); + long queryExecutionTime = getStatistic(StatsKey.QUERY_TIME).longValue(); + float queryRateValue = this.queryRate.getRate(); + long queryRateAverageLatencyValue = this.queryRateAverageLatency.getAverageLatency(); + int queryExecutionsInProgress = getStatistic(StatsKey.QUERIES_IN_PROGRESS).intValue(); + long queryExecutionTotalHits = getStatistic(StatsKey.QUERIES_TOTAL_HITS).longValue(); + + int updates = getStatistic(StatsKey.UPDATES).intValue(); + long updateTime = getStatistic(StatsKey.UPDATE_TIME).longValue(); + float updateRateValue = this.updateRate.getRate(); + long updateRateAverageLatencyValue = this.updateRateAverageLatency.getAverageLatency(); + int updatesInProgress = getStatistic(StatsKey.UPDATES_IN_PROGRESS).intValue(); + + int commits = getStatistic(StatsKey.COMMITS).intValue(); + long commitTime = getStatistic(StatsKey.COMMIT_TIME).longValue(); + float commitRateValue = this.commitRate.getRate(); + long commitRateAverageLatencyValue = this.commitRateAverageLatency.getAverageLatency(); + int commitsInProgress = getStatistic(StatsKey.COMMITS_IN_PROGRESS).intValue(); + + int documents = getStatistic(StatsKey.DOCUMENTS).intValue(); + + return new LuceneIndexMetrics(index.getRegionPath(), index.getName(), queryExecutions, queryExecutionTime, + queryRateValue, queryRateAverageLatencyValue, queryExecutionsInProgress, queryExecutionTotalHits, + updates, updateTime, updateRateValue, updateRateAverageLatencyValue, updatesInProgress, commits, + commitTime, commitRateValue, commitRateAverageLatencyValue, commitsInProgress, documents); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java new file mode 100644 index 0000000..edbfadc --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.cache.lucene.LuceneService; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class LuceneServiceBridge { + + private LuceneService service; + + private Map<String,LuceneIndexStatsMonitor> monitors; + + public LuceneServiceBridge(LuceneService service) { + this.service = service; + this.monitors = new ConcurrentHashMap<>(); + } + + public void addIndex(LuceneIndex index) { + // Create monitor on the index + LuceneIndexStatsMonitor monitor = new LuceneIndexStatsMonitor(index); + + // Register the monitor + this.monitors.put(getMonitorKey(index), monitor); + } + + public LuceneIndexMetrics[] listIndexMetrics() { + Collection<LuceneIndex> indexes = this.service.getAllIndexes(); + LuceneIndexMetrics[] indexMetrics = new LuceneIndexMetrics[indexes.size()]; + int i=0; + for (LuceneIndex index : this.service.getAllIndexes()) { + indexMetrics[i++] = getIndexMetrics((LuceneIndexImpl)index); + } + return indexMetrics; + } + + public LuceneIndexMetrics[] listIndexMetrics(String regionPath) { + if(!regionPath.startsWith(Region.SEPARATOR)) { + regionPath = Region.SEPARATOR + regionPath; + } + List<LuceneIndexMetrics> indexMetrics = new ArrayList(); + for (LuceneIndex index : this.service.getAllIndexes()) { + if (index.getRegionPath().equals(regionPath)) { + indexMetrics.add(getIndexMetrics((LuceneIndexImpl) index)); + } + } + return indexMetrics.toArray(new LuceneIndexMetrics[indexMetrics.size()]); + } + + public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName) { + LuceneIndexImpl index = (LuceneIndexImpl) this.service.getIndex(indexName, regionPath); + return index == null ? null : getIndexMetrics(index); + } + + private String getMonitorKey(LuceneIndex index) { + return index.getRegionPath() + "_" + index.getName(); + } + + private LuceneIndexMetrics getIndexMetrics(LuceneIndexImpl index) { + LuceneIndexStatsMonitor monitor = this.monitors.get(getMonitorKey(index)); + return monitor.getIndexMetrics(index); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java new file mode 100644 index 0000000..4320a9a --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.cache.lucene.LuceneService; +import com.gemstone.gemfire.management.internal.beans.CacheServiceMBeanBase; + +import javax.management.NotificationBroadcasterSupport; + +public class LuceneServiceMBean extends NotificationBroadcasterSupport + implements LuceneServiceMXBean, CacheServiceMBeanBase { + + private LuceneServiceBridge bridge; + + public LuceneServiceMBean(LuceneService service) { + this.bridge = new LuceneServiceBridge(service); + } + + @Override + public LuceneIndexMetrics[] listIndexMetrics() { + return this.bridge.listIndexMetrics(); + } + + @Override + public LuceneIndexMetrics[] listIndexMetrics(String regionPath) { + return this.bridge.listIndexMetrics(regionPath); + } + + @Override + public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName) { + return this.bridge.listIndexMetrics(regionPath, indexName); + } + + @Override + public String getId() { + return "LuceneService"; + } + + @Override + public Class getInterfaceClass() { + return LuceneServiceMXBean.class; + } + + public void addIndex(LuceneIndex index) { + this.bridge.addIndex(index); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java new file mode 100644 index 0000000..e19bc83 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +import com.gemstone.gemfire.management.internal.security.ResourceOperation; +import org.apache.geode.security.ResourcePermission.Operation; +import org.apache.geode.security.ResourcePermission.Resource; + +/** + * MBean that provides access to the {@link com.gemstone.gemfire.cache.lucene.LuceneService}. + */ +@ResourceOperation(resource = Resource.CLUSTER, operation = Operation.READ) +public interface LuceneServiceMXBean { + + /** + * Returns an array of {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex} + * instances defined in this member + * + * @return an array of LuceneIndexMetrics for the LuceneIndexes defined in this member + */ + public LuceneIndexMetrics[] listIndexMetrics(); + + /** + * Returns an array of {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex} + * instances defined on the input region in this member + * + * @param regionPath The full path of the region to retrieve + * + * @return an array of LuceneIndexMetrics for the LuceneIndex instances defined on the input region + * in this member + */ + public LuceneIndexMetrics[] listIndexMetrics(String regionPath); + + /** + * Returns a {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex} + * with the input index name defined on the input region in this member. + * + * @param regionPath The full path of the region to retrieve + * @param indexName The name of the index to retrieve + * + * @return a LuceneIndexMetrics for the LuceneIndex with the input index name defined on the input region + * in this member. + */ + public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName); +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java new file mode 100644 index 0000000..f88058a --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.cache.lucene.internal.IndexListenerAdapter; + +public class ManagementIndexListener extends IndexListenerAdapter { + + private LuceneServiceMBean mbean; + + public ManagementIndexListener(LuceneServiceMBean mbean) { + this.mbean = mbean; + } + + @Override + public void afterIndexCreated(LuceneIndex index) { + mbean.addIndex(index); + } + + @Override + public void beforeIndexDestroyed(LuceneIndex index) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java new file mode 100644 index 0000000..3438937 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal.management; + +public class StatsKey { + + public static final String UPDATES = "updates"; + public static final String UPDATE_TIME = "updateTime"; + public static final String UPDATES_IN_PROGRESS = "updatesInProgress"; + + public static final String COMMITS = "commits"; + public static final String COMMIT_TIME = "commitTime"; + public static final String COMMITS_IN_PROGRESS = "commitsInProgress"; + + public static final String QUERIES = "queryExecutions"; + public static final String QUERY_TIME = "queryExecutionTime"; + public static final String QUERIES_IN_PROGRESS = "queryExecutionsInProgress"; + public static final String QUERIES_TOTAL_HITS = "queryExecutionTotalHits"; + + public static final String DOCUMENTS = "documents"; +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java new file mode 100644 index 0000000..22670f3 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Internal lucene classes, not intended to be used directly. + */ + +package com.gemstone.gemfire.cache.lucene.internal; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java new file mode 100644 index 0000000..e487884 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.gemstone.gemfire.cache.lucene.internal.repository; + +import java.io.IOException; + +import com.gemstone.gemfire.cache.Region; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.Query; + +/** + * An Repository interface for the writing data to lucene. + */ +public interface IndexRepository { + + /** + * Create a new entry in the lucene index + * @throws IOException + */ + void create(Object key, Object value) throws IOException; + + /** + * Update the entries in the lucene index + * @throws IOException + */ + void update(Object key, Object value) throws IOException; + + /** + * Delete the entries in the lucene index + * @throws IOException + */ + void delete(Object key) throws IOException; + + /** + * Query the index index repository, passing the results to the collector + * Only the documents with the top scores, up to the limit, will be passed + * to the collector, in order of score. + * + * @param query + * @param limit the maximum number of hits to return + * @param collector the class to aggregate the hits + * + * @throws IOException + */ + public void query(Query query, int limit, IndexResultCollector collector) throws IOException; + + /** + * Commit the changes to all lucene index + * @throws IOException + */ + void commit() throws IOException; + + Region<?, ?> getRegion(); + + /** + * Check to see if this repository is closed due to + * underlying resources being closed or destroyed + * @return true if this repository is closed. + */ + public boolean isClosed(); + + /** + * For debugging purposes, return the underlying IndexWriter + */ + IndexWriter getWriter(); + + /** + * Clean up any resources associated with this index repository. + */ + void cleanup(); +}
