Repository: asterixdb Updated Branches: refs/heads/master 47ab031e8 -> 7c72a503d
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java index dbead1e..589d697 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java @@ -24,15 +24,19 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.hyracks.util.ExitUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class AsyncFIFOPageQueueManager implements Runnable { - private final static boolean DEBUG = false; + private static final boolean DEBUG = false; + private static final Logger LOGGER = LogManager.getLogger(); - protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<ICachedPage>(); + protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<>(); volatile Thread writerThread; protected AtomicBoolean poisoned = new AtomicBoolean(false); protected BufferCache bufferCache; - volatile protected PageQueue pageQueue; + protected volatile PageQueue pageQueue; public AsyncFIFOPageQueueManager(BufferCache bufferCache) { this.bufferCache = bufferCache; @@ -57,17 +61,27 @@ public class AsyncFIFOPageQueueManager implements Runnable { return writer; } + @SuppressWarnings("squid:S2142") @Override - public void put(ICachedPage page) throws HyracksDataException { + public void put(ICachedPage page, IPageWriteFailureCallback callback) throws HyracksDataException { + failIfPreviousPageFailed(callback); + page.setFailureCallback(callback); try { if (!poisoned.get()) { queue.put(page); } else { - throw new HyracksDataException("Queue is closing"); + LOGGER.error("An attempt to write a page found buffer cache closed"); + ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION); } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); + LOGGER.error("IO Operation interrupted", e); + ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION); + } + } + + private void failIfPreviousPageFailed(IPageWriteFailureCallback callback) throws HyracksDataException { + if (callback.hasFailed()) { + throw HyracksDataException.create(callback.getFailure()); } } } @@ -136,18 +150,21 @@ public class AsyncFIFOPageQueueManager implements Runnable { } } + @SuppressWarnings("squid:S2142") @Override public void run() { - if (DEBUG) - System.out.println("[FIFO] Writer started"); + if (DEBUG) { + LOGGER.info("[FIFO] Writer started"); + } boolean die = false; while (!die) { ICachedPage entry; try { entry = queue.take(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; + LOGGER.error("BufferCache Write Queue was interrupted", e); + ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION); + return; // Keep compiler happy } if (entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()) { synchronized (entry) { @@ -158,17 +175,11 @@ public class AsyncFIFOPageQueueManager implements Runnable { continue; } } - - if (DEBUG) - System.out.println("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage) entry).dpid) + "," + if (DEBUG) { + LOGGER.info("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage) entry).dpid) + "," + BufferedFileHandle.getPageId(((CachedPage) entry).dpid)); - - try { - pageQueue.getWriter().write(entry, bufferCache); - } catch (HyracksDataException e) { - //TODO: What do we do, if we could not write the page? - e.printStackTrace(); } + pageQueue.getWriter().write(entry, bufferCache); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index 02eb8bf..6ec12aa 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -23,10 +23,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + /** * @author yingyib */ public class CachedPage implements ICachedPageInternal { + private static final Logger LOGGER = LogManager.getLogger(); final int cpid; ByteBuffer buffer; public final AtomicInteger pinCount; @@ -44,6 +48,7 @@ public class CachedPage implements ICachedPageInternal { // DEBUG private static final boolean DEBUG = false; private final StackTraceElement[] ctorStack; + private IPageWriteFailureCallback failureCallback; //Constructor for making dummy entry for FIFO queue public CachedPage() { @@ -85,6 +90,7 @@ public class CachedPage implements ICachedPageInternal { confiscated.set(false); pageReplacementStrategy.notifyCachePageReset(this); queueInfo = null; + failureCallback = null; } public void invalidate() { @@ -103,11 +109,7 @@ public class CachedPage implements ICachedPageInternal { @Override public boolean isGoodVictim() { - if (confiscated.get()) { - return false; // i am not a good victim because i cant flush! - } else { - return pinCount.get() == 0; - } + return !confiscated.get() && pinCount.get() == 0; } @Override @@ -205,4 +207,21 @@ public class CachedPage implements ICachedPageInternal { public boolean isLargePage() { return multiplier > 1; } + + @Override + public void setFailureCallback(IPageWriteFailureCallback failureCallback) { + if (this.failureCallback != null) { + throw new IllegalStateException("failureCallback is already set"); + } + this.failureCallback = failureCallback; + } + + @Override + public void writeFailed(Exception e) { + if (failureCallback != null) { + failureCallback.writeFailed(this, e); + } else { + LOGGER.error("An IO Failure took place but the failure callback is not set", e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java index 856edbc..3d3ce3c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java @@ -15,35 +15,37 @@ package org.apache.hyracks.storage.common.buffercache; -import org.apache.hyracks.api.exceptions.ErrorCode; -import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.util.ExitUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class FIFOLocalWriter implements IFIFOPageWriter { + private static final Logger LOGGER = LogManager.getLogger(); public static final FIFOLocalWriter INSTANCE = new FIFOLocalWriter(); - private static boolean DEBUG = false; + private static final boolean DEBUG = false; private FIFOLocalWriter() { } + @SuppressWarnings("squid:S1181") // System must halt on all IO errors @Override - public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException { + public void write(ICachedPage page, BufferCache bufferCache) { CachedPage cPage = (CachedPage) page; try { bufferCache.write(cPage); - } catch (HyracksDataException e) { - if (e.getErrorCode() != ErrorCode.FILE_DOES_NOT_EXIST) { - throw HyracksDataException.create(e); - } + } catch (Exception e) { + page.writeFailed(e); + LOGGER.warn("Failed to write page {}", cPage, e); + } catch (Throwable th) { + // Halt + LOGGER.error("FIFOLocalWriter has encountered a fatal error", th); + ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION); } finally { bufferCache.returnPage(cPage); if (DEBUG) { - System.out.println("[FIFO] Return page: " + cPage.cpid + "," + cPage.dpid); + LOGGER.error("[FIFO] Return page: {}, {}", cPage.cpid, cPage.dpid); } } } - @Override - public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException { - bufferCache.force(fileId, true); - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java new file mode 100644 index 0000000..0b748e1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +import org.apache.hyracks.util.ExitUtil; + +public class HaltOnFailureCallback implements IPageWriteFailureCallback { + public static final HaltOnFailureCallback INSTANCE = new HaltOnFailureCallback(); + + private HaltOnFailureCallback() { + } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION); + } + + @Override + public boolean hasFailed() { + return false; + } + + @Override + public Throwable getFailure() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java index 16837b9..cfbb145 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java @@ -44,10 +44,14 @@ public interface ICachedPage { void setDiskPageId(long dpid); + void setFailureCallback(IPageWriteFailureCallback callback); + /** * Check if a page is a large page * * @return true if the page is large, false otherwise */ boolean isLargePage(); + + void writeFailed(Exception e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java index c500286..d900852 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java @@ -19,13 +19,16 @@ package org.apache.hyracks.storage.common.buffercache; public interface ICachedPageInternal extends ICachedPage { - public int getCachedPageId(); + int getCachedPageId(); - public long getDiskPageId(); + long getDiskPageId(); - public Object getReplacementStrategyObject(); + Object getReplacementStrategyObject(); - public boolean isGoodVictim(); + /** + * @return true if can be evicted, false otherwise + */ + boolean isGoodVictim(); void setFrameSizeMultiplier(int multiplier); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java index 6c03671..189c402 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java @@ -19,5 +19,20 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; @FunctionalInterface public interface IFIFOPageQueue { - void put(ICachedPage page) throws HyracksDataException; + + /** + * Put a page in the write queue + * + * @param page + * the page to be written + * @param callback + * callback in case of a failure + * @throws HyracksDataException + * if the callback has already failed. This indicates a failure writing a previous page + * in the same operation. + * Note: having this failure at this place removes the need to check for failures with + * every add() call in the bulk loader and so, we check per page given to disk rather + * than per tuple given to loader. At the same time, it allows the bulk load to fail early. + */ + void put(ICachedPage page, IPageWriteFailureCallback callback) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java index 567c01e..26fd414 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java @@ -15,10 +15,7 @@ package org.apache.hyracks.storage.common.buffercache; -import org.apache.hyracks.api.exceptions.HyracksDataException; - +@FunctionalInterface public interface IFIFOPageWriter { - public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException; - - void sync(int fileId, BufferCache bufferCache) throws HyracksDataException; + void write(ICachedPage page, BufferCache bufferCache); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java new file mode 100644 index 0000000..da9cb6a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +public interface IPageWriteFailureCallback { + + /** + * Notify that an async write operation has failed + * + * @param page + * @param failure + */ + void writeFailed(ICachedPage page, Throwable failure); + + /** + * @return true if the callback has received any failure + */ + boolean hasFailed(); + + /** + * @return a failure writing to disk or null if no failure has been seen + * This doesn't guarantee which failure is returned but that if one or more failures occurred + * while trying to write to disk, one of those failures is returned. All other failures are expected + * to be logged. + */ + Throwable getFailure(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java new file mode 100644 index 0000000..c11e596 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +public class PageWriteFailureCallback implements IPageWriteFailureCallback { + + private volatile Throwable failure; + + @Override + public final void writeFailed(ICachedPage page, Throwable failure) { + if (this.failure == null) { + this.failure = failure; + } + } + + @Override + public final boolean hasFailed() { + return failure != null; + } + + @Override + public final Throwable getFailure() { + return failure; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java index d7ec4e9..b2a1ff2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java @@ -147,4 +147,14 @@ public class VirtualPage implements ICachedPage { str.append("}"); return str.toString(); } + + @Override + public void setFailureCallback(IPageWriteFailureCallback callback) { + throw new UnsupportedOperationException(); + } + + @Override + public void writeFailed(Exception e) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java index c77bea0..853a2ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java @@ -183,6 +183,9 @@ public class DiskBTreeSearchCursorTest extends BTreeSearchCursorTest { bulkloader.add(tuple); } bulkloader.end(); + if (bulkloader.hasFailed()) { + throw HyracksDataException.create(bulkloader.getFailure()); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java index f94914c..d0d02a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -41,6 +40,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.common.buffercache.CachedPage; +import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.file.BufferedFileHandle; @@ -96,7 +96,7 @@ public class BufferCacheTest { long dpid = BufferedFileHandle.getDiskPageId(fileId, i); ICachedPage page = bufferCache.confiscatePage(dpid); page.getBuffer().putInt(0, i); - bufferCache.createFIFOQueue().put(page); + bufferCache.createFIFOQueue().put(page, HaltOnFailureCallback.INSTANCE); } bufferCache.finishQueue(); bufferCache.closeFile(fileId);