This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 8fd7d467ce073a179204107954ed8d475dbed125 Merge: 9627b2a 1d6f811 Author: Michael Blow <[email protected]> AuthorDate: Tue May 26 15:34:42 2020 -0400 Merge branch 'gerrit/mad-hatter' Change-Id: I1ed2ad00b1fb6ef5fe70e7a1d6a753d8da59e269 .../org/apache/asterix/app/nc/HaltCallback.java | 5 +- .../PersistentLocalResourceRepositoryTest.java | 53 ++++++++++++++++++++++ .../common/context/GlobalVirtualBufferCache.java | 2 +- .../ioopcallbacks/LSMIOOperationCallback.java | 32 ++++++++++++- .../asterix/common/storage/ResourceReference.java | 14 ++++++ .../replication/messaging/ComponentMaskTask.java | 4 +- .../PersistentLocalResourceRepository.java | 14 +----- .../java/org/apache/hyracks/util/ExitUtil.java | 2 +- .../main/java/org/apache/hyracks/util/Span.java | 21 ++++++++- 9 files changed, 123 insertions(+), 24 deletions(-) diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java index e3bd13d,0000000..45594eb mode 100644,000000..100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java @@@ -1,517 -1,0 +1,517 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.context; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.api.replication.IIOReplicationManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; +import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper; +import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter; +import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; +import org.apache.hyracks.storage.common.buffercache.VirtualPage; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.hyracks.storage.common.file.IFileMapManager; +import org.apache.hyracks.util.ExitUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycleComponent { + private static final Logger LOGGER = LogManager.getLogger(); + + // keep track of the memory usage of each filtered memory component + private final Map<ILSMMemoryComponent, AtomicInteger> memoryComponentUsageMap = + Collections.synchronizedMap(new HashMap<>()); + private final Map<FileReference, AtomicInteger> fileRefUsageMap = Collections.synchronizedMap(new HashMap<>()); + private final Int2ObjectMap<AtomicInteger> fileIdUsageMap = + Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>()); + + private final List<ILSMIndex> primaryIndexes = new ArrayList<>(); + private volatile int flushPtr; + private volatile ILSMIndex flushingIndex; + + private final int filteredMemoryComponentMaxNumPages; + private final int flushPageBudget; + private final VirtualBufferCache vbc; + private final AtomicBoolean isOpen = new AtomicBoolean(false); + private final FlushThread flushThread = new FlushThread(); + + public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties) { + this.vbc = new VirtualBufferCache(allocator, storageProperties.getBufferCachePageSize(), + (int) (storageProperties.getMemoryComponentGlobalBudget() + / storageProperties.getMemoryComponentPageSize())); + this.flushPageBudget = (int) (storageProperties.getMemoryComponentGlobalBudget() + / storageProperties.getMemoryComponentPageSize() + * storageProperties.getMemoryComponentFlushThreshold()); + this.filteredMemoryComponentMaxNumPages = storageProperties.getFilteredMemoryComponentMaxNumPages(); + } + + @Override + public int getPageSize() { + return vbc.getPageSize(); + } + + @Override + public int getPageSizeWithHeader() { + return vbc.getPageSizeWithHeader(); + } + + @Override + public synchronized void register(ILSMMemoryComponent memoryComponent) { + ILSMIndex index = memoryComponent.getLsmIndex(); + if (index.isPrimaryIndex()) { + if (!primaryIndexes.contains(index)) { + // make sure only add index once + primaryIndexes.add(index); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Registered {} index {} to the global VBC", + isMetadataIndex(index) ? "metadata" : "primary", index.toString()); + } + } + if (index.getNumOfFilterFields() > 0) { + // handle filtered primary index + AtomicInteger usage = new AtomicInteger(); + memoryComponentUsageMap.put(memoryComponent, usage); + for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) { + if (ref != null) { + fileRefUsageMap.put(ref, usage); + } + } + } + } + } + + @Override + public synchronized void unregister(ILSMMemoryComponent memoryComponent) { + ILSMIndex index = memoryComponent.getLsmIndex(); + if (index.isPrimaryIndex()) { + int pos = primaryIndexes.indexOf(index); + if (pos >= 0) { + primaryIndexes.remove(index); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Unregistered {} index {} to the global VBC", + isMetadataIndex(index) ? "metadata" : "primary", index.toString()); + } + if (primaryIndexes.isEmpty()) { + flushPtr = 0; + } else if (flushPtr > pos) { + // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that + // it still points to the same index. + flushPtr = (flushPtr - 1) % primaryIndexes.size(); + } + } + if (index.getNumOfFilterFields() > 0) { + memoryComponentUsageMap.remove(memoryComponent); + for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) { + if (ref != null) { + fileRefUsageMap.remove(ref); + } + } + } + } + } + + @Override + public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException { + if (memoryComponent.getLsmIndex() == flushingIndex) { + synchronized (this) { + if (memoryComponent.getLsmIndex() == flushingIndex) { + flushingIndex = null; + // After the flush operation is completed, we may have 2 cases: + // 1. there is no active reader on this memory component and memory is reclaimed; + // 2. there are still some active readers and memory cannot be reclaimed. + // But for both cases, we will notify all primary index op trackers to let their writers retry, + // if they have been blocked. Moreover, we will check whether more flushes are needed. + final int size = primaryIndexes.size(); + for (int i = 0; i < size; i++) { + ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker(); + synchronized (opTracker) { + opTracker.notifyAll(); + } + } + + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Completed flushing {}. Resetting flushIndex back to null.", + memoryComponent.getIndex().toString()); + } + } + } + checkAndNotifyFlushThread(); + } + if (memoryComponent.getLsmIndex().getNumOfFilterFields() > 0 + && memoryComponent.getLsmIndex().isPrimaryIndex()) { + AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent); + if (usage != null) { + // reset usage to 0 after the memory component is flushed + usage.set(0); + } + } + } + + @Override + public int getPageBudget() { + return vbc.getPageBudget(); + } + + @Override + public boolean isFull() { + return vbc.isFull(); + } + + @Override + public boolean isFull(ILSMMemoryComponent memoryComponent) { + return memoryComponent.getLsmIndex() == flushingIndex || isFilteredMemoryComponentFull(memoryComponent); + } + + private boolean isFilteredMemoryComponentFull(ILSMMemoryComponent memoryComponent) { + if (filteredMemoryComponentMaxNumPages <= 0 || memoryComponent.getLsmIndex().getNumOfFilterFields() == 0 + || !memoryComponent.getLsmIndex().isPrimaryIndex()) { + return false; + } + AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent); + return usage.get() >= filteredMemoryComponentMaxNumPages; + } + + @Override + public int createFile(FileReference fileRef) throws HyracksDataException { + int fileId = vbc.createFile(fileRef); + updateFileIdUsageMap(fileRef, fileId); + return fileId; + } + + @Override + public int openFile(FileReference fileRef) throws HyracksDataException { + int fileId = vbc.openFile(fileRef); + updateFileIdUsageMap(fileRef, fileId); + return fileId; + } + + private void updateFileIdUsageMap(FileReference fileRef, int fileId) { + AtomicInteger usage = fileRefUsageMap.get(fileRef); + if (usage != null) { + fileIdUsageMap.put(fileId, usage); + } + } + + @Override + public void openFile(int fileId) throws HyracksDataException { + vbc.openFile(fileId); + } + + @Override + public void closeFile(int fileId) throws HyracksDataException { + vbc.closeFile(fileId); + } + + @Override + public void deleteFile(FileReference fileRef) throws HyracksDataException { + vbc.deleteFile(fileRef); + } + + @Override + public void deleteFile(int fileId) throws HyracksDataException { + vbc.deleteFile(fileId); + } + + @Override + public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException { + ICachedPage page = vbc.pin(dpid, newPage); + if (newPage) { + incrementFilteredMemoryComponentUsage(dpid, 1); + checkAndNotifyFlushThread(); + } + return page; + } + + private void incrementFilteredMemoryComponentUsage(long dpid, int pages) { + if (filteredMemoryComponentMaxNumPages > 0) { + // update memory usage of filtered index + AtomicInteger usage = fileIdUsageMap.get(BufferedFileHandle.getFileId(dpid)); + if (usage != null) { + usage.addAndGet(pages); + // We do not need extra code to flush this filtered memory component when it becomes full. + // This method is only called when there are active writers on this memory component. + // When the writer exits, it'll automatically flush this memory component when it finds out + // that this memory component becomes full. + } + } + } + + private void checkAndNotifyFlushThread() { + if (vbc.getUsage() < flushPageBudget || flushingIndex != null) { + // For better performance, we only flush one dataset partition at a time. + // After reclaiming memory from this dataset partition, its memory can be used by other indexes. + // Thus, given N dataset partitions, each dataset partition will approximately receive 2/N of + // the total memory instead of 1/N, which doubles the memory utilization. + return; + } + // Notify the flush thread to schedule flushes. This is used to avoid deadlocks because page pins can be + // called while synchronizing on op trackers. + synchronized (flushThread.flushLock) { + flushThread.flushLock.notifyAll(); + } + } + + @Override + public void resizePage(ICachedPage cPage, int multiplier, IExtraPageBlockHelper extraPageBlockHelper) + throws HyracksDataException { + vbc.resizePage(cPage, multiplier, extraPageBlockHelper); + int delta = multiplier - cPage.getFrameSizeMultiplier(); + incrementFilteredMemoryComponentUsage(((VirtualPage) cPage).dpid(), delta); + if (delta > 0) { + checkAndNotifyFlushThread(); + } + } + + @Override + public void unpin(ICachedPage page) throws HyracksDataException { + vbc.unpin(page); + } + + @Override + public void flush(ICachedPage page) throws HyracksDataException { + vbc.flush(page); + } + + @Override + public void force(int fileId, boolean metadata) throws HyracksDataException { + vbc.force(fileId, metadata); + } + + @Override + public void open() throws HyracksDataException { + + } + + @Override + public void close() throws HyracksDataException { + // no op + } + + @Override + public void start() { + if (isOpen.compareAndSet(false, true)) { + try { + vbc.open(); + } catch (HyracksDataException e) { + throw new IllegalStateException("Fail to open virtual buffer cache ", e); + } + flushThread.start(); + } + } + + @Override + public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { + if (isOpen.compareAndSet(true, false)) { + if (dumpState) { + dumpState(ouputStream); + } + vbc.close(); + synchronized (flushThread.flushLock) { + flushThread.flushLock.notifyAll(); + } + try { + flushThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + } + + @Override + public void dumpState(OutputStream os) throws IOException { + os.write(vbc.toString().getBytes()); + } + + @Override + public IFileMapManager getFileMapProvider() { + return vbc.getFileMapProvider(); + } + + @Override + public int getNumPagesOfFile(int fileId) throws HyracksDataException { + return vbc.getNumPagesOfFile(fileId); + } + + @Override + public void returnPage(ICachedPage page) { + vbc.returnPage(page); + } + + @Override + public IFIFOPageWriter createFIFOWriter(IPageWriteCallback callback, IPageWriteFailureCallback failureCallback) { + return vbc.createFIFOWriter(callback, failureCallback); + } + + @Override + public ICachedPage confiscatePage(long dpid) throws HyracksDataException { + return vbc.confiscatePage(dpid); + } + + @Override + public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) + throws HyracksDataException { + return vbc.confiscateLargePage(dpid, multiplier, extraBlockPageId); + } + + @Override + public void returnPage(ICachedPage page, boolean reinsert) { + vbc.returnPage(page, reinsert); + } + + @Override + public int getFileReferenceCount(int fileId) { + return vbc.getFileReferenceCount(fileId); + } + + @Override + public boolean isReplicationEnabled() { + return vbc.isReplicationEnabled(); + } + + @Override + public IIOReplicationManager getIOReplicationManager() { + return vbc.getIOReplicationManager(); + } + + @Override + public void purgeHandle(int fileId) throws HyracksDataException { + vbc.purgeHandle(fileId); + } + + @Override + public String toString() { + return vbc.toString(); + } + + @Override + public void closeFileIfOpen(FileReference fileRef) { + vbc.closeFileIfOpen(fileRef); + } + + @Override + public int getUsage() { + return vbc.getUsage(); + } + + private boolean isMetadataIndex(ILSMIndex index) { + BaseOperationTracker opTracker = (BaseOperationTracker) index.getOperationTracker(); + return MetadataIndexImmutableProperties.isMetadataDataset(opTracker.getDatasetInfo().getDatasetID()); + } + + /** + * We use a dedicated thread to schedule flushes to avoid deadlock. We cannot schedule flushes directly during + * page pins because page pins can be called while synchronized on op trackers (e.g., when resetting a + * memory component). + */ + private class FlushThread extends Thread { + private final Object flushLock = new Object(); + + @Override + public void run() { + while (isOpen.get()) { + synchronized (flushLock) { + try { + flushLock.wait(); + } catch (InterruptedException e) { + LOGGER.error("Flushing thread is interrupted unexpectedly.", e); + } + } + if (isOpen.get()) { + try { + scheduleFlush(); + } catch (Throwable e) { + LOGGER.error("Unexpected exception when trying to schedule flushes.", e); - ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED); ++ ExitUtil.halt(ExitUtil.EC_IO_SCHEDULER_FAILED); + } + } + } + } + + private void scheduleFlush() throws HyracksDataException { + synchronized (GlobalVirtualBufferCache.this) { + int cycles = 0; + while (vbc.getUsage() >= flushPageBudget && flushingIndex == null && cycles <= primaryIndexes.size()) { + // find the first modified memory component while avoiding infinite loops + while (cycles <= primaryIndexes.size() + && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) { + flushPtr = (flushPtr + 1) % primaryIndexes.size(); + cycles++; + } + + ILSMIndex primaryIndex = primaryIndexes.get(flushPtr); + flushPtr = (flushPtr + 1) % primaryIndexes.size(); + // we need to manually flush this memory component because it may be idle at this point + // note that this is different from flushing a filtered memory component + PrimaryIndexOperationTracker opTracker = + (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker(); + synchronized (opTracker) { + boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty(); + if (flushable && !opTracker.isFlushLogCreated()) { + // if the flush log has already been created, then we can simply wait for + // that flush to complete + opTracker.setFlushOnExit(true); + opTracker.flushIfNeeded(); + // If the flush cannot be scheduled at this time, then there must be active writers. + // The flush will be eventually scheduled when writers exit + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Requested {} flushing primary index {}", + isMetadataIndex(primaryIndex) ? "metadata" : "primary", + primaryIndex.toString()); + } + } + if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) { + // global vbc cannot wait on metadata indexes because metadata indexes support full + // ACID transactions. Waiting on metadata indexes can introduce deadlocks. + flushingIndex = primaryIndex; + LOGGER.debug("Waiting for flushing primary index {} to complete...", primaryIndex); + break; + } + } + } + } + } + } + +}
