http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java new file mode 100644 index 0000000..e06d41e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * Input stream to read data from grid cache with separate blocks. + */ +public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { + /** Empty chunks result. */ + private static final byte[][] EMPTY_CHUNKS = new byte[0][]; + + /** Meta manager. */ + private final IgfsMetaManager meta; + + /** Data manager. */ + private final IgfsDataManager data; + + /** Secondary file system reader. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private final IgfsReader secReader; + + /** Logger. */ + private IgniteLogger log; + + /** Path to file. */ + protected final IgfsPath path; + + /** File descriptor. */ + private volatile IgfsFileInfo fileInfo; + + /** The number of already read bytes. Important! Access to the property is guarded by this object lock. */ + private long pos; + + /** Local cache. */ + private final Map<Long, IgniteInternalFuture<byte[]>> locCache; + + /** Maximum local cache size. */ + private final int maxLocCacheSize; + + /** Pending data read futures which were evicted from the local cache before completion. */ + private final Set<IgniteInternalFuture<byte[]>> pendingFuts; + + /** Pending futures lock. */ + private final Lock pendingFutsLock = new ReentrantLock(); + + /** Pending futures condition. */ + private final Condition pendingFutsCond = pendingFutsLock.newCondition(); + + /** Closed flag. */ + private boolean closed; + + /** Number of blocks to prefetch asynchronously. */ + private int prefetchBlocks; + + /** Numbed of blocks that must be read sequentially before prefetch is triggered. */ + private int seqReadsBeforePrefetch; + + /** Bytes read. */ + private long bytes; + + /** Index of the previously read block. Initially it is set to -1 indicating that no reads has been made so far. */ + private long prevBlockIdx = -1; + + /** Amount of sequential reads performed. */ + private int seqReads; + + /** Time consumed on reading. */ + private long time; + + /** Local GGFs metrics. */ + private final IgfsLocalMetrics metrics; + + /** + * Constructs file output stream. + * + * @param ggfsCtx GGFS context. + * @param path Path to stored file. + * @param fileInfo File info to write binary data to. + * @param prefetchBlocks Number of blocks to prefetch. + * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered. + * @param secReader Optional secondary file system reader. + * @param metrics Local GGFS metrics. + */ + IgfsInputStreamImpl(IgfsContext ggfsCtx, IgfsPath path, IgfsFileInfo fileInfo, int prefetchBlocks, + int seqReadsBeforePrefetch, @Nullable IgfsReader secReader, IgfsLocalMetrics metrics) { + assert ggfsCtx != null; + assert path != null; + assert fileInfo != null; + assert metrics != null; + + this.path = path; + this.fileInfo = fileInfo; + this.prefetchBlocks = prefetchBlocks; + this.seqReadsBeforePrefetch = seqReadsBeforePrefetch; + this.secReader = secReader; + this.metrics = metrics; + + meta = ggfsCtx.meta(); + data = ggfsCtx.data(); + + log = ggfsCtx.kernalContext().log(IgfsInputStream.class); + + maxLocCacheSize = (prefetchBlocks > 0 ? prefetchBlocks : 1) * 3 / 2; + + locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f); + + pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1); + } + + /** + * Gets bytes read. + * + * @return Bytes read. + */ + public synchronized long bytes() { + return bytes; + } + + /** {@inheritDoc} */ + @Override public IgfsFileInfo fileInfo() { + return fileInfo; + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + byte[] buf = new byte[1]; + + int read = read(buf, 0, 1); + + if (read == -1) + return -1; // EOF. + + return buf[0] & 0xFF; // Cast to int and cut to *unsigned* byte value. + } + + /** {@inheritDoc} */ + @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { + int read = readFromStore(pos, b, off, len); + + if (read != -1) + pos += read; + + return read; + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + if (pos < 0) + throw new IOException("Seek position cannot be negative: " + pos); + + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public synchronized long position() throws IOException { + return pos; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + long l = fileInfo.length() - pos; + + if (l < 0) + return 0; + + if (l > Integer.MAX_VALUE) + return Integer.MAX_VALUE; + + return (int)l; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { + for (int readBytes = 0; readBytes < len; ) { + int read = readFromStore(pos + readBytes, buf, off + readBytes, len - readBytes); + + if (read == -1) + throw new EOFException("Failed to read stream fully (stream ends unexpectedly)" + + "[pos=" + pos + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); + + readBytes += read; + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + return readFromStore(pos, buf, off, len); + } + + /** {@inheritDoc} */ + @SuppressWarnings("IfMayBeConditional") + @Override public synchronized byte[][] readChunks(long pos, int len) throws IOException { + // Readable bytes in the file, starting from the specified position. + long readable = fileInfo.length() - pos; + + if (readable <= 0) + return EMPTY_CHUNKS; + + long startTime = System.nanoTime(); + + if (readable < len) + len = (int)readable; // Truncate expected length to available. + + assert len > 0; + + bytes += len; + + int start = (int)(pos / fileInfo.blockSize()); + int end = (int)((pos + len - 1) / fileInfo.blockSize()); + + int chunkCnt = end - start + 1; + + byte[][] chunks = new byte[chunkCnt][]; + + for (int i = 0; i < chunkCnt; i++) { + byte[] block = blockFragmentizerSafe(start + i); + + int blockOff = (int)(pos % fileInfo.blockSize()); + int blockLen = Math.min(len, block.length - blockOff); + + // If whole block can be used as result, do not do array copy. + if (blockLen == block.length) + chunks[i] = block; + else { + // Only first or last block can have non-full data. + assert i == 0 || i == chunkCnt - 1; + + chunks[i] = Arrays.copyOfRange(block, blockOff, blockOff + blockLen); + } + + len -= blockLen; + pos += blockLen; + } + + assert len == 0; + + time += System.nanoTime() - startTime; + + return chunks; + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + try { + if (secReader != null) { + // Close secondary input stream. + secReader.close(); + + // Ensuring local cache futures completion. + for (IgniteInternalFuture<byte[]> fut : locCache.values()) { + try { + fut.get(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + } + + // Ensuring pending evicted futures completion. + while (!pendingFuts.isEmpty()) { + pendingFutsLock.lock(); + + try { + pendingFutsCond.await(100, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignore) { + // No-op. + } + finally { + pendingFutsLock.unlock(); + } + } + + // Safety to ensure no orphaned data blocks exist in case file was concurrently deleted. + if (!meta.exists(fileInfo.id())) + data.delete(fileInfo); + } + } + catch (IgniteCheckedException e) { + throw new IOError(e); // Something unrecoverable. + } + finally { + closed = true; + + metrics.addReadBytesTime(bytes, time); + + locCache.clear(); + } + } + + /** + * @param pos Position to start reading from. + * @param buf Data buffer to save read data to. + * @param off Offset in the buffer to write data from. + * @param len Length of the data to read from the stream. + * @return Number of actually read bytes. + * @throws IOException In case of any IO exception. + */ + private int readFromStore(long pos, byte[] buf, int off, int len) throws IOException { + if (pos < 0) + throw new IllegalArgumentException("Read position cannot be negative: " + pos); + + if (buf == null) + throw new NullPointerException("Destination buffer cannot be null."); + + if (off < 0 || len < 0 || buf.length < len + off) + throw new IndexOutOfBoundsException("Invalid buffer boundaries " + + "[buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); + + if (len == 0) + return 0; // Fully read done: read zero bytes correctly. + + // Readable bytes in the file, starting from the specified position. + long readable = fileInfo.length() - pos; + + if (readable <= 0) + return -1; // EOF. + + long startTime = System.nanoTime(); + + if (readable < len) + len = (int)readable; // Truncate expected length to available. + + assert len > 0; + + byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize()); + + // Skip bytes to expected position. + int blockOff = (int)(pos % fileInfo.blockSize()); + + len = Math.min(len, block.length - blockOff); + + U.arrayCopy(block, blockOff, buf, off, len); + + bytes += len; + time += System.nanoTime() - startTime; + + return len; + } + + /** + * Method to safely retrieve file block. In case if file block is missing this method will check file map + * and update file info. This may be needed when file that we are reading is concurrently fragmented. + * + * @param blockIdx Block index to read. + * @return Block data. + * @throws IOException If read failed. + */ + private byte[] blockFragmentizerSafe(long blockIdx) throws IOException { + try { + try { + return block(blockIdx); + } + catch (IgfsCorruptedFileException e) { + if (log.isDebugEnabled()) + log.debug("Failed to fetch file block [path=" + path + ", fileInfo=" + fileInfo + + ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']'); + + // This failure may be caused by file being fragmented. + if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) { + IgfsFileInfo newInfo = meta.info(fileInfo.id()); + + // File was deleted. + if (newInfo == null) + throw new IgfsFileNotFoundException("Failed to read file block (file was concurrently " + + "deleted) [path=" + path + ", blockIdx=" + blockIdx + ']'); + + fileInfo = newInfo; + + // Must clear cache as it may have failed futures. + locCache.clear(); + + if (log.isDebugEnabled()) + log.debug("Updated input stream file info after block fetch failure [path=" + path + + ", fileInfo=" + fileInfo + ']'); + + return block(blockIdx); + } + + throw new IOException(e.getMessage(), e); + } + } + catch (IgniteCheckedException e) { + throw new IOException(e.getMessage(), e); + } + } + + /** + * @param blockIdx Block index. + * @return File block data. + * @throws IOException If failed. + * @throws IgniteCheckedException If failed. + */ + private byte[] block(long blockIdx) throws IOException, IgniteCheckedException { + assert blockIdx >= 0; + + IgniteInternalFuture<byte[]> bytesFut = locCache.get(blockIdx); + + if (bytesFut == null) { + if (closed) + throw new IOException("Stream is already closed: " + this); + + seqReads = (prevBlockIdx != -1 && prevBlockIdx + 1 == blockIdx) ? ++seqReads : 0; + + prevBlockIdx = blockIdx; + + bytesFut = dataBlock(fileInfo, blockIdx); + + assert bytesFut != null; + + addLocalCacheFuture(blockIdx, bytesFut); + } + + // Schedule the next block(s) prefetch. + if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) { + for (int i = 1; i <= prefetchBlocks; i++) { + // Ensure that we do not prefetch over file size. + if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length()) + break; + else if (locCache.get(blockIdx + i) == null) + addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i)); + } + } + + byte[] bytes = bytesFut.get(); + + if (bytes == null) + throw new IgfsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " + + "[path=" + path + ", blockIdx=" + blockIdx + ']'); + + int blockSize = fileInfo.blockSize(); + + if (blockIdx == fileInfo.blocksCount() - 1) + blockSize = (int)(fileInfo.length() % blockSize); + + // If part of the file was reserved for writing, but was not actually written. + if (bytes.length < blockSize) + throw new IOException("Inconsistent file's data block (incorrectly written?)" + + " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length + + ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() + + ", fileLen=" + fileInfo.length() + ']'); + + return bytes; + } + + /** + * Add local cache future. + * + * @param idx Block index. + * @param fut Future. + */ + private void addLocalCacheFuture(long idx, IgniteInternalFuture<byte[]> fut) { + assert Thread.holdsLock(this); + + if (!locCache.containsKey(idx)) { + if (locCache.size() == maxLocCacheSize) { + final IgniteInternalFuture<byte[]> evictFut = locCache.remove(locCache.keySet().iterator().next()); + + if (!evictFut.isDone()) { + pendingFuts.add(evictFut); + + evictFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<byte[]>>() { + @Override public void apply(IgniteInternalFuture<byte[]> t) { + pendingFuts.remove(evictFut); + + pendingFutsLock.lock(); + + try { + pendingFutsCond.signalAll(); + } + finally { + pendingFutsLock.unlock(); + } + } + }); + } + } + + locCache.put(idx, fut); + } + } + + /** + * Get data block for specified block index. + * + * @param fileInfo File info. + * @param blockIdx Block index. + * @return Requested data block or {@code null} if nothing found. + * @throws IgniteCheckedException If failed. + */ + @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsFileInfo fileInfo, long blockIdx) throws IgniteCheckedException { + return data.dataBlock(fileInfo, path, blockIdx, secReader); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsInputStreamImpl.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java new file mode 100644 index 0000000..8126487 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; + +/** + * Internal exception thrown when attempted to update range that is no longer present + * in file affinity map. + */ +public class IgfsInvalidRangeException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Error message. + */ + public IgfsInvalidRangeException(String msg) { + super(msg); + } + + /** + * @param cause Error cause. + */ + public IgfsInvalidRangeException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java new file mode 100644 index 0000000..a963bde --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.processors.closure.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * GGFS IPC handler. + */ +class IgfsIpcHandler implements IgfsServerHandler { + /** For test purposes only. */ + @SuppressWarnings("UnusedDeclaration") + private static boolean errWrite; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Log. */ + private IgniteLogger log; + + /** Buffer size. */ + private final int bufSize; // Buffer size. Must not be less then file block size. + + /** Ggfs instance for this handler. */ + private IgfsEx ggfs; + + /** Resource ID generator. */ + private AtomicLong rsrcIdGen = new AtomicLong(); + + /** Stopping flag. */ + private volatile boolean stopping; + + /** + * Constructs GGFS IPC handler. + * + * @param ggfsCtx Context. + */ + IgfsIpcHandler(IgfsContext ggfsCtx) { + assert ggfsCtx != null; + + ctx = ggfsCtx.kernalContext(); + ggfs = ggfsCtx.ggfs(); + + // Keep buffer size multiple of block size so no extra byte array copies is performed. + bufSize = ggfsCtx.configuration().getBlockSize() * 2; + + log = ctx.log(IgfsIpcHandler.class); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteCheckedException { + stopping = true; + } + + /** {@inheritDoc} */ + @Override public void onClosed(IgfsClientSession ses) { + Iterator<Closeable> it = ses.registeredResources(); + + while (it.hasNext()) { + Closeable stream = it.next(); + + try { + stream.close(); + } + catch (IOException e) { + U.warn(log, "Failed to close opened stream on client close event (will continue) [ses=" + ses + + ", stream=" + stream + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<IgfsMessage> handleAsync(final IgfsClientSession ses, + final IgfsMessage msg, DataInput in) { + try { + // Even if will be closed right after this call, response write error will be ignored. + if (stopping) + return null; + + final IgfsIpcCommand cmd = msg.command(); + + IgniteInternalFuture<IgfsMessage> fut; + + switch (cmd) { + // Execute not-blocking command synchronously in worker thread. + case WRITE_BLOCK: + case MAKE_DIRECTORIES: + case LIST_FILES: + case LIST_PATHS: { + IgfsMessage res = execute(ses, cmd, msg, in); + + fut = res == null ? null : new GridFinishedFuture<>(ctx, res); + + break; + } + + // Execute command asynchronously in user's pool. + default: { + fut = ctx.closure().callLocalSafe(new GridPlainCallable<IgfsMessage>() { + @Override public IgfsMessage call() throws Exception { + // No need to pass data input for non-write-block commands. + return execute(ses, cmd, msg, null); + } + }, GridClosurePolicy.GGFS_POOL); + } + } + + // Pack result object into response format. + return fut; + } + catch (Exception e) { + return new GridFinishedFuture<>(ctx, e); + } + } + + /** + * Execute GGFS command. + * + * @param ses Client connection session. + * @param cmd Command to execute. + * @param msg Message to process. + * @param in Data input in case of block write command. + * @return Command execution result. + * @throws Exception If failed. + */ + private IgfsMessage execute(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg, + @Nullable DataInput in) + throws Exception { + switch (cmd) { + case HANDSHAKE: + return processHandshakeRequest((IgfsHandshakeRequest)msg); + + case STATUS: + return processStatusRequest(); + + case EXISTS: + case INFO: + case PATH_SUMMARY: + case UPDATE: + case RENAME: + case DELETE: + case MAKE_DIRECTORIES: + case LIST_PATHS: + case LIST_FILES: + case SET_TIMES: + case AFFINITY: + case OPEN_READ: + case OPEN_CREATE: + case OPEN_APPEND: + return processPathControlRequest(ses, cmd, msg); + + case CLOSE: + case READ_BLOCK: + case WRITE_BLOCK: + return processStreamControlRequest(ses, cmd, msg, in); + + default: + throw new IgniteCheckedException("Unsupported IPC command: " + cmd); + } + } + + /** + * Processes handshake request. + * + * @param req Handshake request. + * @return Response message. + * @throws IgniteCheckedException In case of handshake failure. + */ + private IgfsMessage processHandshakeRequest(IgfsHandshakeRequest req) throws IgniteCheckedException { + if (!F.eq(ctx.gridName(), req.gridName())) + throw new IgniteCheckedException("Failed to perform handshake because actual Grid name differs from expected " + + "[expected=" + req.gridName() + ", actual=" + ctx.gridName() + ']'); + + if (!F.eq(ggfs.name(), req.ggfsName())) + throw new IgniteCheckedException("Failed to perform handshake because actual GGFS name differs from expected " + + "[expected=" + req.ggfsName() + ", actual=" + ggfs.name() + ']'); + + IgfsControlResponse res = new IgfsControlResponse(); + + ggfs.clientLogDirectory(req.logDirectory()); + + IgfsHandshakeResponse handshake = new IgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(), + ggfs.groupBlockSize(), ggfs.globalSampling()); + + res.handshake(handshake); + + return res; + } + + /** + * Processes status request. + * + * @return Status response. + * @throws IgniteCheckedException If failed. + */ + private IgfsMessage processStatusRequest() throws IgniteCheckedException { + IgfsStatus status = ggfs.globalSpace(); + + IgfsControlResponse res = new IgfsControlResponse(); + + res.status(status); + + return res; + } + + /** + * Processes path control request. + * + * @param ses Session. + * @param cmd Command. + * @param msg Message. + * @return Response message. + * @throws IgniteCheckedException If failed. + */ + private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd, + IgfsMessage msg) throws IgniteCheckedException { + IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + + if (log.isDebugEnabled()) + log.debug("Processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + ']'); + + IgfsControlResponse res = new IgfsControlResponse(); + + try { + switch (cmd) { + case EXISTS: + res.response(ggfs.exists(req.path())); + + break; + + case INFO: + res.response(ggfs.info(req.path())); + + break; + + case PATH_SUMMARY: + res.response(ggfs.summary(req.path())); + + break; + + case UPDATE: + res.response(ggfs.update(req.path(), req.properties())); + + break; + + case RENAME: + ggfs.rename(req.path(), req.destinationPath()); + + res.response(true); + + break; + + case DELETE: + res.response(ggfs.delete(req.path(), req.flag())); + + break; + + case MAKE_DIRECTORIES: + ggfs.mkdirs(req.path(), req.properties()); + + res.response(true); + + break; + + case LIST_PATHS: + res.paths(ggfs.listPaths(req.path())); + + break; + + case LIST_FILES: + res.files(ggfs.listFiles(req.path())); + + break; + + case SET_TIMES: + ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); + + res.response(true); + + break; + + case AFFINITY: + res.locations(ggfs.affinity(req.path(), req.start(), req.length())); + + break; + + case OPEN_READ: { + IgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) : + ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); + + long streamId = registerResource(ses, ggfsIn); + + if (log.isDebugEnabled()) + log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + + IgfsFileInfo info = new IgfsFileInfo(ggfsIn.fileInfo(), null, + ggfsIn.fileInfo().modificationTime()); + + res.response(new IgfsInputStreamDescriptor(streamId, info.length())); + + break; + } + + case OPEN_CREATE: { + long streamId = registerResource(ses, ggfs.create( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Overwrite if exists. + affinityKey(req), // Affinity key based on replication factor. + req.replication(),// Replication factor. + req.blockSize(), // Block size. + req.properties() // File properties. + )); + + if (log.isDebugEnabled()) + log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + + res.response(streamId); + + break; + } + + case OPEN_APPEND: { + long streamId = registerResource(ses, ggfs.append( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Create if absent. + req.properties() // File properties. + )); + + if (log.isDebugEnabled()) + log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + + res.response(streamId); + + break; + } + + default: + assert false : "Unhandled path control request command: " + cmd; + + break; + } + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + + if (log.isDebugEnabled()) + log.debug("Finished processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + + ", res=" + res + ']'); + + return res; + } + + /** + * Processes stream control request. + * + * @param ses Session. + * @param cmd Command. + * @param msg Message. + * @param in Data input to read. + * @return Response message if needed. + * @throws IgniteCheckedException If failed. + * @throws IOException If failed. + */ + private IgfsMessage processStreamControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd, + IgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException { + IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; + + Long rsrcId = req.streamId(); + + IgfsControlResponse resp = new IgfsControlResponse(); + + switch (cmd) { + case CLOSE: { + Closeable res = resource(ses, rsrcId); + + if (log.isDebugEnabled()) + log.debug("Requested to close resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId + + ", res=" + res + ']'); + + if (res == null) + throw new IgniteCheckedException("Resource to close not found: " + rsrcId); + + try { + res.close(); + } + catch (IOException e) { + // Unwrap OutOfSpaceException, if has one. + IgfsOutOfSpaceException space = X.cause(e, IgfsOutOfSpaceException.class); + + if (space != null) + throw space; + + throw e; + } + + boolean success = ses.unregisterResource(rsrcId, res); + + assert success : "Failed to unregister resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId + + ", res=" + res + ']'; + + if (log.isDebugEnabled()) + log.debug("Closed GGFS stream [ggfsName=" + ggfs.name() + ", streamId=" + rsrcId + + ", ses=" + ses + ']'); + + resp.response(true); + + break; + } + + case READ_BLOCK: { + long pos = req.position(); + int size = req.length(); + + IgfsInputStreamAdapter ggfsIn = (IgfsInputStreamAdapter)resource(ses, rsrcId); + + if (ggfsIn == null) + throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId); + + byte[][] chunks = ggfsIn.readChunks(pos, size); + + resp.response(chunks); + + // Calculate number of read bytes. + // len = len(first) + (n - 2) * len(block) + len(last). + int len = 0; + + if (chunks.length > 0) + len += chunks[0].length; + + if (chunks.length > 1) + len += chunks[chunks.length - 1].length; + + if (chunks.length > 2) + len += chunks[1].length * (chunks.length - 2); + + resp.length(len); + + break; + } + + case WRITE_BLOCK: { + assert rsrcId != null : "Missing stream ID"; + + IgfsOutputStream out = (IgfsOutputStream)resource(ses, rsrcId); + + if (out == null) + throw new IgniteCheckedException("Output stream not found (already closed?): " + rsrcId); + + int writeLen = req.length(); + + try { + out.transferFrom(in, writeLen); + + if (errWrite) + throw new IOException("Failed to write data to server (test)."); + + // No response needed. + return null; + } + catch (IOException e) { + resp.error(rsrcId, e.getMessage()); + + break; + } + } + + default: + assert false; + + break; + } + + return resp; + } + + /** + * @param req Path control request. + * @return Affinity key that maps on local node by the time this method is called if replication factor + * is {@code 0}, {@code null} otherwise. + */ + @Nullable private IgniteUuid affinityKey(IgfsPathControlRequest req) { + // Do not generate affinity key for replicated or near-only cache. + if (!req.colocate()) { + if (log.isDebugEnabled()) + log.debug("Will not generate affinity key for path control request [ggfsName=" + ggfs.name() + + ", req=" + req + ']'); + + return null; + } + + IgniteUuid key = ggfs.nextAffinityKey(); + + if (log.isDebugEnabled()) + log.debug("Generated affinity key for path control request [ggfsName=" + ggfs.name() + + ", req=" + req + ", key=" + key + ']'); + + return key; + } + + /** + * Registers closeable resource within client session. + * + * @param ses IPC session. + * @param rsrc Resource to register. + * @return Registration resource ID. + */ + private long registerResource(IgfsClientSession ses, Closeable rsrc) { + long rsrcId = rsrcIdGen.getAndIncrement(); + + boolean registered = ses.registerResource(rsrcId, rsrc); + + assert registered : "Failed to register resource (duplicate id?): " + rsrcId; + + return rsrcId; + } + + /** + * Gets resource by resource ID from client session. + * + * @param ses Session to get resource from. + * @param rsrcId Resource ID. + * @return Registered resource or {@code null} if not found. + */ + @Nullable private Closeable resource(IgfsClientSession ses, Long rsrcId) { + return ses.resource(rsrcId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java new file mode 100644 index 0000000..3ccb404 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.resources.*; + +import java.io.*; + +/** + * GGFS job implementation. + */ +public class IgfsJobImpl implements ComputeJob, GridInternalWrapper<IgfsJob> { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS job. */ + private IgfsJob job; + + /** GGFS name. */ + private String ggfsName; + + /** GGFS path. */ + private IgfsPath path; + + /** Start. */ + private long start; + + /** Length. */ + private long len; + + /** Split resolver. */ + private IgfsRecordResolver rslvr; + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Injected logger. */ + @LoggerResource + private IgniteLogger log; + + /** + * @param job GGFS job. + * @param ggfsName GGFS name. + * @param path Split path. + * @param start Split start offset. + * @param len Split length. + * @param rslvr GGFS split resolver. + */ + public IgfsJobImpl(IgfsJob job, String ggfsName, IgfsPath path, long start, long len, + IgfsRecordResolver rslvr) { + this.job = job; + this.ggfsName = ggfsName; + this.path = path; + this.start = start; + this.len = len; + this.rslvr = rslvr; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + IgniteFs fs = ignite.fileSystem(ggfsName); + + try (IgfsInputStream in = fs.open(path)) { + IgfsFileRange split = new IgfsFileRange(path, start, len); + + if (rslvr != null) { + split = rslvr.resolveRecords(fs, in, split); + + if (split == null) { + log.warning("No data found for split on local node after resolver is applied " + + "[ggfsName=" + ggfsName + ", path=" + path + ", start=" + start + ", len=" + len + ']'); + + return null; + } + } + + in.seek(split.start()); + + return job.execute(fs, new IgfsFileRange(path, split.start(), split.length()), in); + } + catch (IOException e) { + throw new IgniteException("Failed to execute GGFS job for file split [ggfsName=" + ggfsName + + ", path=" + path + ", start=" + start + ", len=" + len + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + job.cancel(); + } + + /** {@inheritDoc} */ + @Override public IgfsJob userObject() { + return job; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java new file mode 100644 index 0000000..508b52a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.io.*; +import java.util.*; + +/** + * Directory listing entry. + */ +public class IgfsListingEntry implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** File affinity key. */ + private IgniteUuid affKey; + + /** Positive block size if file, 0 if directory. */ + private int blockSize; + + /** File length. */ + private long len; + + /** Last access time. */ + private long accessTime; + + /** Last modification time. */ + private long modificationTime; + + /** File properties. */ + private Map<String, String> props; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgfsListingEntry() { + // No-op. + } + + /** + * @param fileInfo File info to construct listing entry from. + */ + public IgfsListingEntry(IgfsFileInfo fileInfo) { + fileId = fileInfo.id(); + affKey = fileInfo.affinityKey(); + + if (fileInfo.isFile()) { + blockSize = fileInfo.blockSize(); + len = fileInfo.length(); + } + + props = fileInfo.properties(); + accessTime = fileInfo.accessTime(); + modificationTime = fileInfo.modificationTime(); + } + + /** + * Creates listing entry with updated length. + * + * @param entry Entry. + * @param len New length. + */ + public IgfsListingEntry(IgfsListingEntry entry, long len, long accessTime, long modificationTime) { + fileId = entry.fileId; + affKey = entry.affKey; + blockSize = entry.blockSize; + props = entry.props; + this.accessTime = accessTime; + this.modificationTime = modificationTime; + + this.len = len; + } + + /** + * @return Entry file ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return File affinity key, if specified. + */ + public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @return {@code True} if entry represents file. + */ + public boolean isFile() { + return blockSize > 0; + } + + /** + * @return {@code True} if entry represents directory. + */ + public boolean isDirectory() { + return blockSize == 0; + } + + /** + * @return Block size. + */ + public int blockSize() { + return blockSize; + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** + * @return Last access time. + */ + public long accessTime() { + return accessTime; + } + + /** + * @return Last modification time. + */ + public long modificationTime() { + return modificationTime; + } + + /** + * @return Properties map. + */ + public Map<String, String> properties() { + return props; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, fileId); + out.writeInt(blockSize); + out.writeLong(len); + U.writeStringMap(out, props); + out.writeLong(accessTime); + out.writeLong(modificationTime); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fileId = U.readGridUuid(in); + blockSize = in.readInt(); + len = in.readLong(); + props = U.readStringMap(in); + accessTime = in.readLong(); + modificationTime = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof IgfsListingEntry)) return false; + + IgfsListingEntry that = (IgfsListingEntry)o; + + return fileId.equals(that.fileId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return fileId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsListingEntry.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLocalMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLocalMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLocalMetrics.java new file mode 100644 index 0000000..74f793d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLocalMetrics.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jdk8.backport.*; + +/** + * Value object holding all local GGFS metrics which cannot be determined using file system traversal. + */ +public class IgfsLocalMetrics { + /** Block reads. First value - total reads, second value - reads delegated to the secondary file system. */ + private volatile IgniteBiTuple<LongAdder, LongAdder> blocksRead; + + /** Block writes. First value - total writes, second value - writes delegated to the secondary file system. */ + private volatile IgniteBiTuple<LongAdder, LongAdder> blocksWritten; + + /** Byte reads. First value - total bytes read, second value - consumed time. */ + private volatile IgniteBiTuple<LongAdder, LongAdder> bytesRead; + + /** Byte writes. First value - total bytes written, second value - consumed time. */ + private volatile IgniteBiTuple<LongAdder, LongAdder> bytesWritten; + + /** Number of files opened for read. */ + private final LongAdder filesOpenedForRead = new LongAdder(); + + /** Number of files opened for write. */ + private final LongAdder filesOpenedForWrite = new LongAdder(); + + /** + * Constructor. + */ + IgfsLocalMetrics() { + reset(); + } + + /** + * @return Read bytes. + */ + long readBytes() { + return bytesRead.get1().longValue(); + } + + /** + * @return Read bytes time. + */ + long readBytesTime() { + return bytesRead.get2().longValue(); + } + + /** + * Adds given numbers to read bytes and read time. + * + * @param readBytes Number of bytes read. + * @param readTime Read time. + */ + void addReadBytesTime(long readBytes, long readTime) { + IgniteBiTuple<LongAdder, LongAdder> bytesRead0 = bytesRead; + + bytesRead0.get1().add(readBytes); + bytesRead0.get2().add(readTime); + } + + /** + * @return Written bytes. + */ + long writeBytes() { + return bytesWritten.get1().longValue(); + } + + /** + * @return Write bytes time. + */ + long writeBytesTime() { + return bytesWritten.get2().longValue(); + } + + /** + * Adds given numbers to written bytes and write time. + * + * @param writtenBytes Number of bytes written. + * @param writeTime Write time. + */ + void addWrittenBytesTime(long writtenBytes, long writeTime) { + IgniteBiTuple<LongAdder, LongAdder> bytesWritten0 = bytesWritten; + + bytesWritten0.get1().add(writtenBytes); + bytesWritten0.get2().add(writeTime); + } + + /** + * @return Read blocks. + */ + long readBlocks() { + return blocksRead.get1().longValue(); + } + + /** + * @return Written blocks to secondary file system. + */ + long readBlocksSecondary() { + return blocksRead.get2().longValue(); + } + + /** + * Adds given numbers to read blocks counters. + * + * @param total Total number of blocks read. + * @param secondary Number of blocks read form secondary FS. + */ + void addReadBlocks(int total, int secondary) { + IgniteBiTuple<LongAdder, LongAdder> blocksRead0 = blocksRead; + + blocksRead0.get1().add(total); + blocksRead0.get2().add(secondary); + } + + /** + * @return Written blocks. + */ + long writeBlocks() { + return blocksWritten.get1().longValue(); + } + + /** + * @return Written blocks to secondary file system. + */ + long writeBlocksSecondary() { + return blocksWritten.get2().longValue(); + } + + /** + * Adds given numbers to write blocks counters. + * + * @param total Total number of block written. + * @param secondary Number of blocks written to secondary FS. + */ + void addWriteBlocks(int total, int secondary) { + IgniteBiTuple<LongAdder, LongAdder> blocksWritten0 = blocksWritten; + + blocksWritten0.get1().add(total); + blocksWritten0.get2().add(secondary); + } + + /** + * Increment files opened for read. + */ + void incrementFilesOpenedForRead() { + filesOpenedForRead.increment(); + } + + /** + * Decrement files opened for read. + */ + void decrementFilesOpenedForRead() { + filesOpenedForRead.decrement(); + } + + /** + * @return Files opened for read. + */ + int filesOpenedForRead() { + return filesOpenedForRead.intValue(); + } + + /** + * Increment files opened for write. + */ + void incrementFilesOpenedForWrite() { + filesOpenedForWrite.increment(); + } + + /** + * Decrement files opened for write. + */ + void decrementFilesOpenedForWrite() { + filesOpenedForWrite.decrement(); + } + + /** + * @return Files opened for write. + */ + int filesOpenedForWrite() { + return filesOpenedForWrite.intValue(); + } + + /** + * Reset summary counters. + */ + void reset() { + blocksRead = F.t(new LongAdder(), new LongAdder()); + blocksWritten = F.t(new LongAdder(), new LongAdder()); + bytesRead = F.t(new LongAdder(), new LongAdder()); + bytesWritten = F.t(new LongAdder(), new LongAdder()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java new file mode 100644 index 0000000..3fc1d0a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; + +import java.util.concurrent.atomic.*; + +/** + * Abstract class for GGFS managers. + */ +public abstract class IgfsManager { + /** GGFS context. */ + protected IgfsContext igfsCtx; + + /** Logger. */ + protected IgniteLogger log; + + /** Starting flag. */ + private AtomicBoolean starting = new AtomicBoolean(); + + /** + * Called when GGFS processor is started. + * + * @param ggfsCtx GGFS context. + */ + public void start(IgfsContext ggfsCtx) throws IgniteCheckedException { + if (!starting.compareAndSet(false, true)) + assert false : "Method start is called more than once for manager: " + this; + + assert ggfsCtx != null; + + this.igfsCtx = ggfsCtx; + + log = ggfsCtx.kernalContext().log(getClass()); + + start0(); + + if (log != null && log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** + * Stops manager. + * + * @param cancel Cancel flag. + */ + public final void stop(boolean cancel) { + if (!starting.get()) + // Ignoring attempt to stop manager that has never been started. + return; + + stop0(cancel); + + if (log != null && log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public final void onKernalStart() throws IgniteCheckedException { + onKernalStart0(); + + if (log != null && log.isDebugEnabled()) + log.debug(kernalStartInfo()); + } + + /** + * @param cancel Cancel flag. + */ + public final void onKernalStop(boolean cancel) { + if (!starting.get()) + // Ignoring attempt to stop manager that has never been started. + return; + + onKernalStop0(cancel); + + if (log != null && log.isDebugEnabled()) + log.debug(kernalStopInfo()); + } + + /** + * Start manager implementation. + */ + protected void start0() throws IgniteCheckedException { + // No-op by default. + } + + /** + * Stop manager implementation. + * + * @param cancel Cancel flag. + */ + protected void stop0(boolean cancel) { + // No-op by default. + } + + /** + * @throws IgniteCheckedException If failed. + */ + protected void onKernalStart0() throws IgniteCheckedException { + // No-op. + } + + /** + * + */ + protected void onKernalStop0(boolean cancel) { + // No-op. + } + + /** + * @return Start info. + */ + protected String startInfo() { + return "Cache manager started: " + getClass().getSimpleName(); + } + + /** + * @return Stop info. + */ + protected String stopInfo() { + return "Cache manager stopped: " + getClass().getSimpleName(); + } + + /** + * @return Start info. + */ + protected String kernalStartInfo() { + return "Cache manager received onKernalStart() callback: " + getClass().getSimpleName(); + } + + /** + * @return Stop info. + */ + protected String kernalStopInfo() { + return "Cache manager received onKernalStop() callback: " + getClass().getSimpleName(); + } +}
