Repository: asterixdb Updated Branches: refs/heads/master 4085b48f6 -> c807e1c43
[NO ISSUE][STO] Move the IO threads from BufferCache to IOManager - user model changes: no - storage format changes: no - interface changes: no Details: Move the IO threads from BufferCache to IOManager to cover all IO uses that go through the IOManager. Change-Id: Ic02b456826ae7abc2619a7eec3f90b48717b0adb Reviewed-on: https://asterix-gerrit.ics.uci.edu/2417 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c807e1c4 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c807e1c4 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c807e1c4 Branch: refs/heads/master Commit: c807e1c43ffed8207b2dddc02d7fac636f8fe0d1 Parents: 4085b48 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Thu Feb 22 13:34:39 2018 -0800 Committer: Michael Blow <mb...@apache.org> Committed: Thu Feb 22 16:20:33 2018 -0800 ---------------------------------------------------------------------- .../app/bootstrap/TestNodeController.java | 2 +- .../dataflow/FeedRecordDataFlowController.java | 4 +- .../input/stream/SocketServerInputStream.java | 8 +- .../apache/hyracks/api/io/IAsyncRequest.java | 26 +++ .../org/apache/hyracks/api/io/IIOFuture.java | 27 --- .../org/apache/hyracks/api/io/IIOManager.java | 12 +- .../apache/hyracks/api/util/CleanupUtils.java | 42 +++- .../control/nc/NodeControllerService.java | 39 ++-- .../apache/hyracks/control/nc/io/IOManager.java | 218 +++++++++++-------- .../apache/hyracks/control/nc/io/IoRequest.java | 170 +++++++++++++++ .../hyracks/control/nc/io/IoRequestHandler.java | 61 ++++++ .../am/common/util/ResourceReleaseUtils.java | 26 --- .../storage/common/buffercache/BufferCache.java | 194 ++++------------- .../storage/common/buffercache/CachedPage.java | 44 +--- .../TestStorageManagerComponentHolder.java | 3 +- .../apache/hyracks/test/support/TestUtils.java | 18 +- .../storage/am/btree/BTreeStatsTest.java | 22 -- .../common/test/LSMIndexFileManagerTest.java | 3 +- 18 files changed, 514 insertions(+), 405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 4d26d25..6c4d068 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -429,7 +429,7 @@ public class TestNodeController { public IHyracksTaskContext createTestContext(JobId jobId, int partition, boolean withMessaging) throws HyracksDataException { - IHyracksTaskContext ctx = TestUtils.create(KB32); + IHyracksTaskContext ctx = TestUtils.create(KB32, ExecutionTestUtil.integrationUtil.ncs[0].getIoManager()); if (withMessaging) { TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 25c71df..306a2a5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -32,9 +32,9 @@ import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -178,7 +178,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } private Throwable finish(Throwable failure) { - Throwable th = ResourceReleaseUtils.close(recordReader, null); + Throwable th = CleanupUtils.close(recordReader, null); th = DataflowUtils.close(tupleForwarder, th); closeSignal(); setState(State.STOPPED); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java index 471d23f..fb9a4a6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java @@ -25,7 +25,7 @@ import java.net.Socket; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -103,11 +103,11 @@ public class SocketServerInputStream extends AsterixInputStream { @Override public synchronized void close() throws IOException { - Throwable failure = ResourceReleaseUtils.close(connectionStream, null); + Throwable failure = CleanupUtils.close(connectionStream, null); connectionStream = null; - failure = ResourceReleaseUtils.close(socket, failure); + failure = CleanupUtils.close(socket, failure); socket = null; - failure = ResourceReleaseUtils.close(server, failure); + failure = CleanupUtils.close(server, failure); server = null; if (failure != null) { throw HyracksDataException.create(failure); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java new file mode 100644 index 0000000..3efb2e1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IAsyncRequest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.io; + +@FunctionalInterface +public interface IAsyncRequest { + + void await() throws InterruptedException; + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java deleted file mode 100644 index 3de2ca1..0000000 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOFuture.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.api.io; - -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public interface IIOFuture { - public int synchronize() throws HyracksDataException, InterruptedException; - - public boolean isComplete(); -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java index b0cc07a..ff1e47f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java @@ -18,13 +18,13 @@ */ package org.apache.hyracks.api.io; +import java.io.Closeable; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.Executor; import org.apache.hyracks.api.exceptions.HyracksDataException; -public interface IIOManager { +public interface IIOManager extends Closeable { public enum FileReadWriteMode { READ_ONLY, READ_WRITE @@ -47,16 +47,16 @@ public interface IIOManager { public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException; - public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data); + IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException; - public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data); + IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException; + + IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException; public void close(IFileHandle fHandle) throws HyracksDataException; public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException; - public void setExecutor(Executor executor); - public long getSize(IFileHandle fileHandle); public void deleteWorkspaceFiles() throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java index a67c133..008cffd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java @@ -41,8 +41,8 @@ public class CleanupUtils { } catch (Throwable th) { // NOSONAR. Had to be done to satisfy contracts try { LOGGER.log(Level.WARN, "Failure destroying a destroyable resource", th); - } catch (Throwable ignore) { - // Do nothing + } catch (Throwable ignore) { // NOSONAR: Ignore catching Throwable + // NOSONAR Ignore logging failure } root = ExceptionUtils.suppress(root, th); } @@ -66,11 +66,11 @@ public class CleanupUtils { if (writer != null) { try { writer.close(); - } catch (Throwable th) { // NOSONAR Will be re-thrown + } catch (Throwable th) { // NOSONAR Will be suppressed try { LOGGER.log(Level.WARN, "Failure closing a closeable resource", th); - } catch (Throwable loggingFailure) { - // Do nothing + } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable + // NOSONAR: Ignore logging failure } root = ExceptionUtils.suppress(root, th); } @@ -90,13 +90,39 @@ public class CleanupUtils { public static void fail(IFrameWriter writer, Throwable root) { try { writer.fail(); - } catch (Throwable th) { // NOSONAR Will be re-thrown + } catch (Throwable th) { // NOSONAR Will be suppressed try { LOGGER.log(Level.WARN, "Failure failing " + writer.getClass().getSimpleName(), th); - } catch (Throwable loggingFailure) { - // Do nothing + } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable + // NOSONAR ignore logging failure } root.addSuppressed(th); } } + + /** + * Close the AutoCloseable and suppress any Throwable thrown by the close call. + * This method must NEVER throw any Throwable + * + * @param closable + * the resource to close + * @param root + * the first exception encountered during release of resources + * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null + */ + public static Throwable close(AutoCloseable closable, Throwable root) { + if (closable != null) { + try { + closable.close(); + } catch (Throwable th) { // NOSONAR Will be suppressed + try { + LOGGER.log(Level.WARN, "Failure closing a closeable resource", th); + } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable + // NOSONAR ignore logging failure + } + root = ExceptionUtils.suppress(root, th); // NOSONAR + } + } + return root; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 0ccef1d..0b7254d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -63,6 +63,7 @@ import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.control.common.base.IClusterController; import org.apache.hyracks.control.common.config.ConfigManager; @@ -218,21 +219,26 @@ public class NodeControllerService implements IControllerService { Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager()); ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver()); - - workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. - jobletMap = new ConcurrentHashMap<>(); - deployedJobSpecActivityClusterGraphMap = new Hashtable<>(); - timer = new Timer(true); - serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, - new File(new File(NodeControllerService.class.getName()), id)); - memoryMXBean = ManagementFactory.getMemoryMXBean(); - gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans(); - threadMXBean = ManagementFactory.getThreadMXBean(); - runtimeMXBean = ManagementFactory.getRuntimeMXBean(); - osMXBean = ManagementFactory.getOperatingSystemMXBean(); - getNodeControllerInfosAcceptor = new MutableObject<>(); - memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR)); - ioCounter = IOCounterFactory.INSTANCE.getIOCounter(); + try { + workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. + jobletMap = new ConcurrentHashMap<>(); + deployedJobSpecActivityClusterGraphMap = new Hashtable<>(); + timer = new Timer(true); + serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, + new File(new File(NodeControllerService.class.getName()), id)); + memoryMXBean = ManagementFactory.getMemoryMXBean(); + gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans(); + threadMXBean = ManagementFactory.getThreadMXBean(); + runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + osMXBean = ManagementFactory.getOperatingSystemMXBean(); + getNodeControllerInfosAcceptor = new MutableObject<>(); + memoryManager = + new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR)); + ioCounter = IOCounterFactory.INSTANCE.getIOCounter(); + } catch (Throwable th) { // NOSONAR will be re-thrown + CleanupUtils.close(ioManager, th); + throw th; + } } public IOManager getIoManager() { @@ -271,7 +277,6 @@ public class NodeControllerService implements IControllerService { } private void init() throws Exception { - ioManager.setExecutor(executor); datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(), ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold()); datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(), @@ -526,7 +531,7 @@ public class NodeControllerService implements IControllerService { }); } ipc.stop(); - + ioManager.close(); LOGGER.log(Level.INFO, "Stopped NodeControllerService"); } else { LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index 2742aaa..a527e92 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -30,43 +30,49 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.io.IFileHandle; -import org.apache.hyracks.api.io.IIOFuture; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IODeviceHandle; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.control.nc.io.IoRequest.State; import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class IOManager implements IIOManager { /* * Constants */ + private static final Logger LOGGER = LogManager.getLogger(); + private static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable private static final String WORKSPACE_FILE_SUFFIX = ".waf"; private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX); /* * Finals */ + private final ExecutorService executor; + private final BlockingQueue<IoRequest> submittedRequests; + private final BlockingQueue<IoRequest> freeRequests; private final List<IODeviceHandle> ioDevices; private final List<IODeviceHandle> workspaces; /* * Mutables */ - private Executor executor; private int workspaceIndex; private IFileDeviceResolver deviceComputer; - public IOManager(List<IODeviceHandle> devices, Executor executor, IFileDeviceResolver deviceComputer) - throws HyracksDataException { - this(devices, deviceComputer); - this.executor = executor; - } - public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException { this.ioDevices = Collections.unmodifiableList(devices); checkDeviceValidity(devices); @@ -86,6 +92,21 @@ public class IOManager implements IIOManager { } workspaceIndex = 0; this.deviceComputer = deviceComputer; + submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); + freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); + int numIoThreads = ioDevices.size() * 2; + executor = Executors.newFixedThreadPool(numIoThreads); + for (int i = 0; i < numIoThreads; i++) { + executor.execute(new IoRequestHandler(i, submittedRequests)); + } + } + + public IoRequest getOrAllocRequest() { + IoRequest request = freeRequests.poll(); + if (request == null) { + request = new IoRequest(this, submittedRequests, freeRequests); + } + return request; } private void checkDeviceValidity(List<IODeviceHandle> devices) throws HyracksDataException { @@ -106,11 +127,6 @@ public class IOManager implements IIOManager { } @Override - public void setExecutor(Executor executor) { - this.executor = executor; - } - - @Override public List<IODeviceHandle> getIODevices() { return ioDevices; } @@ -129,6 +145,39 @@ public class IOManager implements IIOManager { @Override public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + IoRequest req = asyncWrite(fHandle, offset, data); + InvokeUtil.doUninterruptibly(req); + try { + if (req.getState() == State.OPERATION_SUCCEEDED) { + return req.getWrite(); + } else if (req.getState() == State.OPERATION_FAILED) { + throw req.getFailure(); + } else { + throw new IllegalStateException("Write request completed with state " + req.getState()); + } + } finally { + req.recycle(); + } + } + + @Override + public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { + IoRequest req = asyncWrite(fHandle, offset, dataArray); + InvokeUtil.doUninterruptibly(req); + try { + if (req.getState() == State.OPERATION_SUCCEEDED) { + return req.getWrites(); + } else if (req.getState() == State.OPERATION_FAILED) { + throw req.getFailure(); + } else { + throw new IllegalStateException("Write request completed with state " + req.getState()); + } + } finally { + req.recycle(); + } + } + + public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { try { if (fHandle == null) { throw new IllegalStateException("Trying to write to a deleted file."); @@ -152,8 +201,7 @@ public class IOManager implements IIOManager { } } - @Override - public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { + public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { try { if (fHandle == null) { throw new IllegalStateException("Trying to write to a deleted file."); @@ -197,6 +245,22 @@ public class IOManager implements IIOManager { */ @Override public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + IoRequest req = asyncRead(fHandle, offset, data); + InvokeUtil.doUninterruptibly(req); + try { + if (req.getState() == State.OPERATION_SUCCEEDED) { + return req.getRead(); + } else if (req.getState() == State.OPERATION_FAILED) { + throw req.getFailure(); + } else { + throw new IllegalStateException("Reqd request completed with state " + req.getState()); + } + } finally { + req.recycle(); + } + } + + public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { try { int n = 0; int remaining = data.remaining(); @@ -223,16 +287,38 @@ public class IOManager implements IIOManager { } @Override - public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) { - AsyncWriteRequest req = new AsyncWriteRequest((FileHandle) fHandle, offset, data); - executor.execute(req); + public IoRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { + IoRequest req = getOrAllocRequest(); + try { + req.write(fHandle, offset, dataArray); + } catch (HyracksDataException e) { + req.recycle(); + throw e; + } return req; } @Override - public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) { - AsyncReadRequest req = new AsyncReadRequest((FileHandle) fHandle, offset, data); - executor.execute(req); + public IoRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + IoRequest req = getOrAllocRequest(); + try { + req.write(fHandle, offset, data); + } catch (HyracksDataException e) { + req.recycle(); + throw e; + } + return req; + } + + @Override + public IoRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + IoRequest req = getOrAllocRequest(); + try { + req.read(fHandle, offset, data); + } catch (HyracksDataException e) { + req.recycle(); + throw e; + } return req; } @@ -259,80 +345,6 @@ public class IOManager implements IIOManager { return dev.createFileRef(waPath + File.separator + waf.getName()); } - private abstract class AsyncRequest implements IIOFuture, Runnable { - protected final FileHandle fHandle; - protected final long offset; - protected final ByteBuffer data; - private boolean complete; - private HyracksDataException exception; - private int result; - - private AsyncRequest(FileHandle fHandle, long offset, ByteBuffer data) { - this.fHandle = fHandle; - this.offset = offset; - this.data = data; - complete = false; - exception = null; - } - - @Override - public void run() { - HyracksDataException hde = null; - int res = -1; - try { - res = performOperation(); - } catch (HyracksDataException e) { - hde = e; - } - synchronized (this) { - exception = hde; - result = res; - complete = true; - notifyAll(); - } - } - - protected abstract int performOperation() throws HyracksDataException; - - @Override - public synchronized int synchronize() throws HyracksDataException, InterruptedException { - while (!complete) { - wait(); - } - if (exception != null) { - throw exception; - } - return result; - } - - @Override - public synchronized boolean isComplete() { - return complete; - } - } - - private class AsyncReadRequest extends AsyncRequest { - private AsyncReadRequest(FileHandle fHandle, long offset, ByteBuffer data) { - super(fHandle, offset, data); - } - - @Override - protected int performOperation() throws HyracksDataException { - return syncRead(fHandle, offset, data); - } - } - - private class AsyncWriteRequest extends AsyncRequest { - private AsyncWriteRequest(FileHandle fHandle, long offset, ByteBuffer data) { - super(fHandle, offset, data); - } - - @Override - protected int performOperation() throws HyracksDataException { - return syncWrite(fHandle, offset, data); - } - } - @Override public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException { try { @@ -390,4 +402,18 @@ public class IOManager implements IIOManager { } return null; } + + @Override + public void close() throws IOException { + InvokeUtil.doUninterruptibly(() -> submittedRequests.put(IoRequestHandler.POISON_PILL)); + executor.shutdown(); + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.log(Level.WARN, "Failure shutting down {} executor service", getClass().getSimpleName()); + } + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while shutting down {} executor service", getClass().getSimpleName()); + Thread.currentThread().interrupt(); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java new file mode 100644 index 0000000..4235d19 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.io; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IAsyncRequest; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.util.InvokeUtil.InterruptibleAction; + +public class IoRequest implements IAsyncRequest, InterruptibleAction { + + public enum State { + INITIAL, + READ_REQUESTED, + WRITE_REQUESTED, + OPERATION_FAILED, + OPERATION_SUCCEEDED + } + + private final IOManager ioManager; + private final BlockingQueue<IoRequest> submittedRequests; + private final BlockingQueue<IoRequest> freeRequests; + private State state; + private IFileHandle fHandle; + private long offset; + private ByteBuffer data; + private ByteBuffer[] dataArray; + private HyracksDataException failure; + private int read; + private int write; + private long writes; + + public IoRequest(IOManager ioManager, BlockingQueue<IoRequest> submittedRequests, + BlockingQueue<IoRequest> freeRequests) { + this.ioManager = ioManager; + this.submittedRequests = submittedRequests; + this.freeRequests = freeRequests; + reset(); + } + + public void reset() { + state = State.INITIAL; + fHandle = null; + data = null; + dataArray = null; + failure = null; + } + + public void read(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + if (state != State.INITIAL) { + throw new IllegalStateException("Can't request a read operation through a " + state + " request"); + } + state = State.READ_REQUESTED; + this.fHandle = fHandle; + this.offset = offset; + this.data = data; + queue(); + } + + public void write(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { + if (state != State.INITIAL) { + throw new IllegalStateException("Can't request a write operation through a " + state + " request"); + } + state = State.WRITE_REQUESTED; + this.fHandle = fHandle; + this.offset = offset; + this.dataArray = dataArray; + queue(); + } + + public void write(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + if (state != State.INITIAL) { + throw new IllegalStateException("Can't request a write operation through a " + state + " request"); + } + state = State.WRITE_REQUESTED; + this.fHandle = fHandle; + this.offset = offset; + this.data = data; + queue(); + } + + private void queue() throws HyracksDataException { + try { + submittedRequests.put(this); + } catch (InterruptedException e) { // NOSONAR: The call below will re-interrupt + throw HyracksDataException.create(e); + } + } + + @Override + public void await() throws InterruptedException { + synchronized (this) { + while (state != State.OPERATION_FAILED && state != State.OPERATION_SUCCEEDED) { + wait(); + } + } + } + + synchronized void handle() { + try { + if (state == State.READ_REQUESTED) { + read = ioManager.doSyncRead(fHandle, offset, data); + } else if (state == State.WRITE_REQUESTED) { + if (data != null) { + // single buffer + write = ioManager.doSyncWrite(fHandle, offset, data); + } else { + // multiple buffers + writes = ioManager.doSyncWrite(fHandle, offset, dataArray); + } + } else { + throw new IllegalStateException("IO Request with state = " + state); + } + state = State.OPERATION_SUCCEEDED; + } catch (Throwable th) { // NOSONAR: This method must never throw anything + state = State.OPERATION_FAILED; + failure = HyracksDataException.create(th); + } + notifyAll(); + } + + public State getState() { + return state; + } + + void recycle() { + reset(); + freeRequests.offer(this); + } + + public int getRead() { + return read; + } + + public int getWrite() { + return write; + } + + public long getWrites() { + return writes; + } + + @Override + public void run() throws InterruptedException { + await(); + } + + public HyracksDataException getFailure() { + return failure; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java new file mode 100644 index 0000000..84fd24b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequestHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.io; + +import java.util.concurrent.BlockingQueue; + +import org.apache.hyracks.api.util.InvokeUtil; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IoRequestHandler implements Runnable { + private static final Logger LOGGER = LogManager.getLogger(); + public static final IoRequest POISON_PILL = new IoRequest(null, null, null); + private final int num; + private final BlockingQueue<IoRequest> queue; + + public IoRequestHandler(int num, BlockingQueue<IoRequest> queue) { + this.num = num; + this.queue = queue; + } + + @Override + public void run() { + Thread.currentThread().setName(getClass().getSimpleName() + "-" + num); + while (true) { // NOSONAR: Suppress 1 continue and 1 break + IoRequest next; + try { + next = queue.take(); + } catch (InterruptedException e) { // NOSONAR: This is not supposed to be ever interrupted + LOGGER.log(Level.WARN, "Ignoring interrupt. IO threads should never be interrupted."); + continue; + } + if (next == POISON_PILL) { + LOGGER.log(Level.INFO, "Exiting"); + InvokeUtil.doUninterruptibly(() -> queue.put(POISON_PILL)); + if (Thread.interrupted()) { + LOGGER.log(Level.ERROR, "Ignoring interrupt. IO threads should never be interrupted."); + } + break; + } + next.handle(); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java index b597b60..7861df0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/util/ResourceReleaseUtils.java @@ -61,32 +61,6 @@ public class ResourceReleaseUtils { } /** - * Close the AutoCloseable and suppress any Throwable thrown by the close call. - * This method must NEVER throw any Throwable - * - * @param closable - * the resource to close - * @param root - * the first exception encountered during release of resources - * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null - */ - public static Throwable close(AutoCloseable closable, Throwable root) { - if (closable != null) { - try { - closable.close(); - } catch (Throwable th) { // NOSONAR Will be re-thrown - try { - LOGGER.log(Level.WARN, "Failure closing a closeable resource", th); - } catch (Throwable loggingFailure) { - // Do nothing - } - root = ExceptionUtils.suppress(root, th); - } - } - return root; - } - - /** * Close the IIndexDataflowHelper and suppress any Throwable thrown by the close call. * This method must NEVER throw any Throwable * http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 7f915c6..1443bbc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -28,14 +28,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -47,9 +44,7 @@ import org.apache.hyracks.api.io.IFileHandle; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.replication.IIOReplicationManager; -import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.api.util.IoUtil; -import org.apache.hyracks.storage.common.buffercache.CachedPage.State; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.storage.common.file.IFileMapManager; import org.apache.logging.log4j.Level; @@ -60,7 +55,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { private static final Logger LOGGER = LogManager.getLogger(); private static final int MAP_FACTOR = 3; - private static final CachedPage POISON_PILL = new CachedPage(); private static final int MIN_CLEANED_COUNT_DIFF = 3; private static final int PIN_MAX_WAIT_TIME = 50; @@ -72,8 +66,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { private final int pageSize; private final int maxOpenFiles; - private final ExecutorService executor; - private final IIOManager ioManager; + final IIOManager ioManager; private final CacheBucket[] pageMap; private final IPageReplacementStrategy pageReplacementStrategy; private final IPageCleanerPolicy pageCleanerPolicy; @@ -82,7 +75,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { private final Map<Integer, BufferedFileHandle> fileInfoMap; private final AsyncFIFOPageQueueManager fifoWriter; private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>(); - private final BlockingQueue<CachedPage> readRequests; //DEBUG private Level fileOpsLevel = Level.DEBUG; @@ -111,43 +103,19 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { this.pageReplacementStrategy = pageReplacementStrategy; this.pageCleanerPolicy = pageCleanerPolicy; this.fileMapManager = fileMapManager; - int numReaders = ioManager.getIODevices().size() * 2; - readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages()); - executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory); - try { - fileInfoMap = new HashMap<>(); - cleanerThread = new CleanerThread(); - executor.execute(cleanerThread); - for (int i = 0; i < numReaders; i++) { - executor.execute(new ReaderThread(i)); - } - closed = false; - fifoWriter = new AsyncFIFOPageQueueManager(this); - if (DEBUG) { - confiscatedPages = new ArrayList<>(); - confiscatedPagesOwner = new HashMap<>(); - confiscateLock = new ReentrantLock(); - pinnedPageOwner = new ConcurrentHashMap<>(); - } - } catch (Throwable th) { - try { - throw th; - } finally { - readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty - executor.shutdown(); - try { - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { - LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service"); - } - } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service"); - Thread.currentThread().interrupt(); - th.addSuppressed(e); - } catch (Throwable e) { - LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e); - th.addSuppressed(e); - } - } + + Executor executor = Executors.newCachedThreadPool(threadFactory); + fileInfoMap = new HashMap<>(); + cleanerThread = new CleanerThread(); + executor.execute(cleanerThread); + closed = false; + + fifoWriter = new AsyncFIFOPageQueueManager(this); + if (DEBUG) { + confiscatedPages = new ArrayList<>(); + confiscatedPagesOwner = new HashMap<>(); + confiscateLock = new ReentrantLock(); + pinnedPageOwner = new ConcurrentHashMap<>(); } } @@ -201,48 +169,38 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { pinSanityCheck(dpid); } CachedPage cPage = findPage(dpid); - if (cPage.state != State.VALID) { - synchronized (cPage) { - if (!newPage) { - if (DEBUG) { - confiscateLock.lock(); - try { - for (CachedPage c : confiscatedPages) { - if (c.dpid == dpid && c.confiscated.get()) { - throw new IllegalStateException(); - } - } - } finally { - confiscateLock.unlock(); + if (!newPage) { + if (DEBUG) { + confiscateLock.lock(); + try { + for (CachedPage c : confiscatedPages) { + if (c.dpid == dpid && c.confiscated.get()) { + throw new IllegalStateException(); } } - // Resolve race of multiple threads trying to read the page from - // disk. - - if (cPage.state != State.VALID) { - try { - // Will attempt to re-read even if previous read failed - if (cPage.state == State.INVALID || cPage.state == State.READ_FAILED) { - // submit request to read - cPage.state = State.READ_REQUESTED; - readRequests.put(cPage); - } - cPage.awaitRead(); - } catch (InterruptedException e) { - cPage.state = State.INVALID; - unpin(cPage); - throw HyracksDataException.create(e); - } catch (Throwable th) { + } finally { + confiscateLock.unlock(); + } + } + // Resolve race of multiple threads trying to read the page from + // disk. + synchronized (cPage) { + if (!cPage.valid) { + try { + tryRead(cPage); + cPage.valid = true; + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e); + throw e; + } finally { + if (!cPage.valid) { unpin(cPage); - throw HyracksDataException.create(th); } } - - } else { - cPage.state = State.VALID; - cPage.notifyAll(); } } + } else { + cPage.valid = true; } pageReplacementStrategy.notifyCachePageAccess(cPage); if (DEBUG) { @@ -491,7 +449,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { buffer.append(" ").append(cp.cpid).append(" -> [") .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) - .append(", ").append(cp.state).append(", ") + .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ") .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); cp = cp.next; @@ -522,7 +480,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) { return false; } - if (c.state == State.VALID) { + if (c.valid) { reachableDpids.add(c.dpid); } } @@ -561,9 +519,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { read(cPage); return; } catch (HyracksDataException readException) { - if (Thread.interrupted()) { - LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted."); - } if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) { /** * if the read failure was due to another thread closing the file channel because @@ -575,7 +530,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { LOGGER.log(Level.WARN, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1, MAX_PAGE_READ_ATTEMPTS), readException); } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted."); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } } else { throw readException; @@ -714,56 +670,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } } - private class ReaderThread implements Runnable { - private final int num; - - private ReaderThread(int num) { - this.num = num; - } - - @Override - public void run() { - Thread.currentThread().setName("Buffer-Cache-Reader-" + num); - while (true) { - CachedPage next; - try { - next = readRequests.take(); - } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted."); - break; - } - if (next == POISON_PILL) { - LOGGER.log(Level.INFO, "Exiting"); - InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL)); - if (Thread.interrupted()) { - LOGGER.log(Level.ERROR, "Ignoring interrupt. Reader threads should never be interrupted."); - } - break; - } - synchronized (next) { - if (next.state != State.VALID) { - if (next.state != State.READ_REQUESTED) { - LOGGER.log(Level.ERROR, - "Exiting BufferCache reader thread. Took a page with state = {} out of the queue", - next.state); - break; - } - try { - tryRead(next); - next.state = State.VALID; - } catch (HyracksDataException e) { - next.readFailure = e; - next.state = State.READ_FAILED; - LOGGER.log(Level.WARN, "Failed to read a page", e); - } - next.notifyAll(); - } - } - } - } - - } - private class CleanerThread implements Runnable { private volatile boolean shutdownStart = false; private volatile boolean shutdownComplete = false; @@ -893,16 +799,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { }); fileInfoMap.clear(); } - InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL)); - executor.shutdown(); - try { - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { - LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service"); - } - } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service"); - Thread.currentThread().interrupt(); - } } @Override @@ -1447,7 +1343,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } try { cPage.reset(cPage.dpid); - cPage.state = State.VALID; + cPage.valid = true; cPage.next = bucket.cachedPage; bucket.cachedPage = cPage; cPage.pinCount.decrementAndGet(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index d7a55af..02eb8bf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -23,19 +23,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.hyracks.api.exceptions.HyracksDataException; - /** * @author yingyib */ public class CachedPage implements ICachedPageInternal { - public enum State { - INVALID, - READ_REQUESTED, - READ_FAILED, - VALID - } - final int cpid; ByteBuffer buffer; public final AtomicInteger pinCount; @@ -45,7 +36,7 @@ public class CachedPage implements ICachedPageInternal { private final IPageReplacementStrategy pageReplacementStrategy; volatile long dpid; // disk page id (composed of file id and page id) CachedPage next; - volatile State state; + volatile boolean valid; final AtomicBoolean confiscated; private IQueueInfo queueInfo; private int multiplier; @@ -53,7 +44,6 @@ public class CachedPage implements ICachedPageInternal { // DEBUG private static final boolean DEBUG = false; private final StackTraceElement[] ctorStack; - Throwable readFailure; //Constructor for making dummy entry for FIFO queue public CachedPage() { @@ -82,7 +72,7 @@ public class CachedPage implements ICachedPageInternal { latch = new ReentrantReadWriteLock(true); replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid); dpid = -1; - state = State.INVALID; + valid = false; confiscated = new AtomicBoolean(false); queueInfo = null; ctorStack = DEBUG ? new Throwable().getStackTrace() : null; @@ -91,7 +81,7 @@ public class CachedPage implements ICachedPageInternal { public void reset(long dpid) { this.dpid = dpid; dirty.set(false); - state = State.INVALID; + valid = false; confiscated.set(false); pageReplacementStrategy.notifyCachePageReset(this); queueInfo = null; @@ -215,30 +205,4 @@ public class CachedPage implements ICachedPageInternal { public boolean isLargePage() { return multiplier > 1; } - - /** - * Wait for the page requested to be read to complete the read operation - * This method is uninterrubtible - * - * @throws HyracksDataException - */ - public synchronized void awaitRead() throws HyracksDataException { - boolean interrupted = false; - try { - while (state != State.VALID && state != State.READ_FAILED) { - try { - wait(); - } catch (InterruptedException e) { - interrupted = true; - } - } - if (state == State.READ_FAILED) { - throw HyracksDataException.create(readFailure); - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java index 3060b25..673a27f 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java @@ -21,7 +21,6 @@ package org.apache.hyracks.test.support; import java.io.File; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.hyracks.api.application.INCServiceContext; @@ -110,7 +109,7 @@ public class TestStorageManagerComponentHolder { List<IODeviceHandle> devices = new ArrayList<>(); devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"), "iodev_test_wa")); - ioManager = new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver()); + ioManager = new IOManager(devices, new DefaultDeviceResolver()); } return ioManager; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index d7578af..ebfaeb8 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executors; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.client.NodeControllerInfo; @@ -40,6 +39,7 @@ import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.control.nc.io.DefaultDeviceResolver; import org.apache.hyracks.control.nc.io.IOManager; import org.apache.logging.log4j.core.Appender; @@ -49,8 +49,20 @@ import org.apache.logging.log4j.core.config.Configuration; public class TestUtils { public static IHyracksTaskContext create(int frameSize) { + IOManager ioManager = null; + try { + ioManager = createIoManager(); + return create(frameSize, ioManager); + } catch (Exception e) { + if (ioManager != null) { + CleanupUtils.close(ioManager, e); + } + throw new RuntimeException(e); + } + } + + public static IHyracksTaskContext create(int frameSize, IOManager ioManager) { try { - IOManager ioManager = createIoManager(); INCServiceContext serviceCtx = new TestNCServiceContext(ioManager, null); TestJobletContext jobletCtx = new TestJobletContext(frameSize, serviceCtx, new JobId(0)); TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0); @@ -64,7 +76,7 @@ public class TestUtils { private static IOManager createIoManager() throws HyracksException { List<IODeviceHandle> devices = new ArrayList<>(); devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), ".")); - return new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver()); + return new IOManager(devices, new DefaultDeviceResolver()); } public static void compareWithResult(File expectedFile, File actualFile) throws Exception { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java index 2243ee3..ea22355 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/BTreeStatsTest.java @@ -73,85 +73,66 @@ public class BTreeStatsTest extends AbstractBTreeTest { @Test public void test01() throws Exception { - TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES); IBufferCache bufferCache = harness.getBufferCache(); - // declare fields int fieldCount = 2; ITypeTraits[] typeTraits = new ITypeTraits[fieldCount]; typeTraits[0] = IntegerPointable.TYPE_TRAITS; typeTraits[1] = IntegerPointable.TYPE_TRAITS; - // declare keys int keyFieldCount = 1; IBinaryComparatorFactory[] cmpFactories = new IBinaryComparatorFactory[keyFieldCount]; cmpFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY); - BTreeTypeAwareTupleWriterFactory tupleWriterFactory = new BTreeTypeAwareTupleWriterFactory(typeTraits, false); ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(tupleWriterFactory); ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(tupleWriterFactory); ITreeIndexMetadataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory(); - IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) leafFrameFactory.createFrame(); IBTreeInteriorFrame interiorFrame = (IBTreeInteriorFrame) interiorFrameFactory.createFrame(); ITreeIndexMetadataFrame metaFrame = metaFrameFactory.createFrame(); - IMetadataPageManager freePageManager = new LinkedMetaDataPageManager(bufferCache, metaFrameFactory); - BTree btree = new BTree(bufferCache, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, harness.getFileReference()); btree.create(); btree.activate(); - Random rnd = new Random(); rnd.setSeed(50); - long start = System.currentTimeMillis(); - if (LOGGER.isInfoEnabled()) { LOGGER.info("INSERTING INTO TREE"); } - IFrame frame = new VSizeFrame(ctx); FrameTupleAppender appender = new FrameTupleAppender(); ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount); DataOutput dos = tb.getDataOutput(); - ISerializerDeserializer[] recDescSers = { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }; RecordDescriptor recDesc = new RecordDescriptor(recDescSers); IFrameTupleAccessor accessor = new FrameTupleAccessor(recDesc); accessor.reset(frame.getBuffer()); FrameTupleReference tuple = new FrameTupleReference(); - IndexAccessParameters actx = new IndexAccessParameters(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE); ITreeIndexAccessor indexAccessor = btree.createAccessor(actx); // 10000 for (int i = 0; i < 100000; i++) { - int f0 = rnd.nextInt() % 100000; int f1 = 5; - tb.reset(); IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos); tb.addFieldEndOffset(); IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos); tb.addFieldEndOffset(); - appender.reset(frame, true); appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); - tuple.reset(accessor, 0); - if (LOGGER.isInfoEnabled()) { if (i % 10000 == 0) { long end = System.currentTimeMillis(); LOGGER.info("INSERTING " + i + " : " + f0 + " " + f1 + " " + (end - start)); } } - try { indexAccessor.insert(tuple); } catch (HyracksDataException e) { @@ -161,18 +142,15 @@ public class BTreeStatsTest extends AbstractBTreeTest { } } } - TreeIndexStatsGatherer statsGatherer = new TreeIndexStatsGatherer(bufferCache, freePageManager, harness.getFileReference(), btree.getRootPageId()); TreeIndexStats stats = statsGatherer.gatherStats(leafFrame, interiorFrame, metaFrame); if (LOGGER.isInfoEnabled()) { LOGGER.info("\n" + stats.toString()); } - TreeIndexBufferCacheWarmup bufferCacheWarmup = new TreeIndexBufferCacheWarmup(bufferCache, freePageManager, harness.getFileReference()); bufferCacheWarmup.warmup(leafFrame, metaFrame, new int[] { 1, 2 }, new int[] { 2, 5 }); - btree.deactivate(); btree.destroy(); bufferCache.close(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c807e1c4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java index 56618e7..8c1124f 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java @@ -31,7 +31,6 @@ import java.util.Comparator; import java.util.Date; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Executors; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -247,7 +246,7 @@ public class LSMIndexFileManagerTest { String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i; devices.add(new IODeviceHandle(new File(iodevPath), "wa")); } - return new IOManager(devices, Executors.newCachedThreadPool(), new DefaultDeviceResolver()); + return new IOManager(devices, new DefaultDeviceResolver()); } private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b)