http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInputStreamImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInputStreamImpl.java deleted file mode 100644 index ae8c043..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInputStreamImpl.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -/** - * Input stream to read data from grid cache with separate blocks. - */ -public class GridGgfsInputStreamImpl extends GridGgfsInputStreamAdapter { - /** Empty chunks result. */ - private static final byte[][] EMPTY_CHUNKS = new byte[0][]; - - /** Meta manager. */ - private final GridGgfsMetaManager meta; - - /** Data manager. */ - private final GridGgfsDataManager data; - - /** Secondary file system reader. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private final IgniteFsReader secReader; - - /** Logger. */ - private IgniteLogger log; - - /** Path to file. */ - protected final IgniteFsPath path; - - /** File descriptor. */ - private volatile GridGgfsFileInfo fileInfo; - - /** The number of already read bytes. Important! Access to the property is guarded by this object lock. */ - private long pos; - - /** Local cache. */ - private final Map<Long, IgniteFuture<byte[]>> locCache; - - /** Maximum local cache size. */ - private final int maxLocCacheSize; - - /** Pending data read futures which were evicted from the local cache before completion. */ - private final Set<IgniteFuture<byte[]>> pendingFuts; - - /** Pending futures lock. */ - private final Lock pendingFutsLock = new ReentrantLock(); - - /** Pending futures condition. */ - private final Condition pendingFutsCond = pendingFutsLock.newCondition(); - - /** Closed flag. */ - private boolean closed; - - /** Number of blocks to prefetch asynchronously. */ - private int prefetchBlocks; - - /** Numbed of blocks that must be read sequentially before prefetch is triggered. */ - private int seqReadsBeforePrefetch; - - /** Bytes read. */ - private long bytes; - - /** Index of the previously read block. Initially it is set to -1 indicating that no reads has been made so far. */ - private long prevBlockIdx = -1; - - /** Amount of sequential reads performed. */ - private int seqReads; - - /** Time consumed on reading. */ - private long time; - - /** Local GGFs metrics. */ - private final GridGgfsLocalMetrics metrics; - - /** - * Constructs file output stream. - * - * @param ggfsCtx GGFS context. - * @param path Path to stored file. - * @param fileInfo File info to write binary data to. - * @param prefetchBlocks Number of blocks to prefetch. - * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered. - * @param secReader Optional secondary file system reader. - * @param metrics Local GGFS metrics. - */ - GridGgfsInputStreamImpl(GridGgfsContext ggfsCtx, IgniteFsPath path, GridGgfsFileInfo fileInfo, int prefetchBlocks, - int seqReadsBeforePrefetch, @Nullable IgniteFsReader secReader, GridGgfsLocalMetrics metrics) { - assert ggfsCtx != null; - assert path != null; - assert fileInfo != null; - assert metrics != null; - - this.path = path; - this.fileInfo = fileInfo; - this.prefetchBlocks = prefetchBlocks; - this.seqReadsBeforePrefetch = seqReadsBeforePrefetch; - this.secReader = secReader; - this.metrics = metrics; - - meta = ggfsCtx.meta(); - data = ggfsCtx.data(); - - log = ggfsCtx.kernalContext().log(IgniteFsInputStream.class); - - maxLocCacheSize = (prefetchBlocks > 0 ? prefetchBlocks : 1) * 3 / 2; - - locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f); - - pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1); - } - - /** - * Gets bytes read. - * - * @return Bytes read. - */ - public synchronized long bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @Override public GridGgfsFileInfo fileInfo() { - return fileInfo; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - byte[] buf = new byte[1]; - - int read = read(buf, 0, 1); - - if (read == -1) - return -1; // EOF. - - return buf[0] & 0xFF; // Cast to int and cut to *unsigned* byte value. - } - - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - int read = readFromStore(pos, b, off, len); - - if (read != -1) - pos += read; - - return read; - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - if (pos < 0) - throw new IOException("Seek position cannot be negative: " + pos); - - - this.pos = pos; - } - - /** {@inheritDoc} */ - @Override public synchronized long position() throws IOException { - return pos; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - long l = fileInfo.length() - pos; - - if (l < 0) - return 0; - - if (l > Integer.MAX_VALUE) - return Integer.MAX_VALUE; - - return (int)l; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { - readFully(pos, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { - for (int readBytes = 0; readBytes < len; ) { - int read = readFromStore(pos + readBytes, buf, off + readBytes, len - readBytes); - - if (read == -1) - throw new EOFException("Failed to read stream fully (stream ends unexpectedly)" + - "[pos=" + pos + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); - - readBytes += read; - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { - return readFromStore(pos, buf, off, len); - } - - /** {@inheritDoc} */ - @SuppressWarnings("IfMayBeConditional") - @Override public synchronized byte[][] readChunks(long pos, int len) throws IOException { - // Readable bytes in the file, starting from the specified position. - long readable = fileInfo.length() - pos; - - if (readable <= 0) - return EMPTY_CHUNKS; - - long startTime = System.nanoTime(); - - if (readable < len) - len = (int)readable; // Truncate expected length to available. - - assert len > 0; - - bytes += len; - - int start = (int)(pos / fileInfo.blockSize()); - int end = (int)((pos + len - 1) / fileInfo.blockSize()); - - int chunkCnt = end - start + 1; - - byte[][] chunks = new byte[chunkCnt][]; - - for (int i = 0; i < chunkCnt; i++) { - byte[] block = blockFragmentizerSafe(start + i); - - int blockOff = (int)(pos % fileInfo.blockSize()); - int blockLen = Math.min(len, block.length - blockOff); - - // If whole block can be used as result, do not do array copy. - if (blockLen == block.length) - chunks[i] = block; - else { - // Only first or last block can have non-full data. - assert i == 0 || i == chunkCnt - 1; - - chunks[i] = Arrays.copyOfRange(block, blockOff, blockOff + blockLen); - } - - len -= blockLen; - pos += blockLen; - } - - assert len == 0; - - time += System.nanoTime() - startTime; - - return chunks; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - try { - if (secReader != null) { - // Close secondary input stream. - secReader.close(); - - // Ensuring local cache futures completion. - for (IgniteFuture<byte[]> fut : locCache.values()) { - try { - fut.get(); - } - catch (IgniteCheckedException ignore) { - // No-op. - } - } - - // Ensuring pending evicted futures completion. - while (!pendingFuts.isEmpty()) { - pendingFutsLock.lock(); - - try { - pendingFutsCond.await(100, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ignore) { - // No-op. - } - finally { - pendingFutsLock.unlock(); - } - } - - // Safety to ensure no orphaned data blocks exist in case file was concurrently deleted. - if (!meta.exists(fileInfo.id())) - data.delete(fileInfo); - } - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - finally { - closed = true; - - metrics.addReadBytesTime(bytes, time); - - locCache.clear(); - } - } - - /** - * @param pos Position to start reading from. - * @param buf Data buffer to save read data to. - * @param off Offset in the buffer to write data from. - * @param len Length of the data to read from the stream. - * @return Number of actually read bytes. - * @throws IOException In case of any IO exception. - */ - private int readFromStore(long pos, byte[] buf, int off, int len) throws IOException { - if (pos < 0) - throw new IllegalArgumentException("Read position cannot be negative: " + pos); - - if (buf == null) - throw new NullPointerException("Destination buffer cannot be null."); - - if (off < 0 || len < 0 || buf.length < len + off) - throw new IndexOutOfBoundsException("Invalid buffer boundaries " + - "[buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); - - if (len == 0) - return 0; // Fully read done: read zero bytes correctly. - - // Readable bytes in the file, starting from the specified position. - long readable = fileInfo.length() - pos; - - if (readable <= 0) - return -1; // EOF. - - long startTime = System.nanoTime(); - - if (readable < len) - len = (int)readable; // Truncate expected length to available. - - assert len > 0; - - byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize()); - - // Skip bytes to expected position. - int blockOff = (int)(pos % fileInfo.blockSize()); - - len = Math.min(len, block.length - blockOff); - - U.arrayCopy(block, blockOff, buf, off, len); - - bytes += len; - time += System.nanoTime() - startTime; - - return len; - } - - /** - * Method to safely retrieve file block. In case if file block is missing this method will check file map - * and update file info. This may be needed when file that we are reading is concurrently fragmented. - * - * @param blockIdx Block index to read. - * @return Block data. - * @throws IOException If read failed. - */ - private byte[] blockFragmentizerSafe(long blockIdx) throws IOException { - try { - try { - return block(blockIdx); - } - catch (IgniteFsCorruptedFileException e) { - if (log.isDebugEnabled()) - log.debug("Failed to fetch file block [path=" + path + ", fileInfo=" + fileInfo + - ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']'); - - // This failure may be caused by file being fragmented. - if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) { - GridGgfsFileInfo newInfo = meta.info(fileInfo.id()); - - // File was deleted. - if (newInfo == null) - throw new IgniteFsFileNotFoundException("Failed to read file block (file was concurrently " + - "deleted) [path=" + path + ", blockIdx=" + blockIdx + ']'); - - fileInfo = newInfo; - - // Must clear cache as it may have failed futures. - locCache.clear(); - - if (log.isDebugEnabled()) - log.debug("Updated input stream file info after block fetch failure [path=" + path - + ", fileInfo=" + fileInfo + ']'); - - return block(blockIdx); - } - - throw new IOException(e.getMessage(), e); - } - } - catch (IgniteCheckedException e) { - throw new IOException(e.getMessage(), e); - } - } - - /** - * @param blockIdx Block index. - * @return File block data. - * @throws IOException If failed. - * @throws IgniteCheckedException If failed. - */ - private byte[] block(long blockIdx) throws IOException, IgniteCheckedException { - assert blockIdx >= 0; - - IgniteFuture<byte[]> bytesFut = locCache.get(blockIdx); - - if (bytesFut == null) { - if (closed) - throw new IOException("Stream is already closed: " + this); - - seqReads = (prevBlockIdx != -1 && prevBlockIdx + 1 == blockIdx) ? ++seqReads : 0; - - prevBlockIdx = blockIdx; - - bytesFut = dataBlock(fileInfo, blockIdx); - - assert bytesFut != null; - - addLocalCacheFuture(blockIdx, bytesFut); - } - - // Schedule the next block(s) prefetch. - if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) { - for (int i = 1; i <= prefetchBlocks; i++) { - // Ensure that we do not prefetch over file size. - if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length()) - break; - else if (locCache.get(blockIdx + i) == null) - addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i)); - } - } - - byte[] bytes = bytesFut.get(); - - if (bytes == null) - throw new IgniteFsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " + - "[path=" + path + ", blockIdx=" + blockIdx + ']'); - - int blockSize = fileInfo.blockSize(); - - if (blockIdx == fileInfo.blocksCount() - 1) - blockSize = (int)(fileInfo.length() % blockSize); - - // If part of the file was reserved for writing, but was not actually written. - if (bytes.length < blockSize) - throw new IOException("Inconsistent file's data block (incorrectly written?)" + - " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length + - ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() + - ", fileLen=" + fileInfo.length() + ']'); - - return bytes; - } - - /** - * Add local cache future. - * - * @param idx Block index. - * @param fut Future. - */ - private void addLocalCacheFuture(long idx, IgniteFuture<byte[]> fut) { - assert Thread.holdsLock(this); - - if (!locCache.containsKey(idx)) { - if (locCache.size() == maxLocCacheSize) { - final IgniteFuture<byte[]> evictFut = locCache.remove(locCache.keySet().iterator().next()); - - if (!evictFut.isDone()) { - pendingFuts.add(evictFut); - - evictFut.listenAsync(new IgniteInClosure<IgniteFuture<byte[]>>() { - @Override public void apply(IgniteFuture<byte[]> t) { - pendingFuts.remove(evictFut); - - pendingFutsLock.lock(); - - try { - pendingFutsCond.signalAll(); - } - finally { - pendingFutsLock.unlock(); - } - } - }); - } - } - - locCache.put(idx, fut); - } - } - - /** - * Get data block for specified block index. - * - * @param fileInfo File info. - * @param blockIdx Block index. - * @return Requested data block or {@code null} if nothing found. - * @throws IgniteCheckedException If failed. - */ - @Nullable protected IgniteFuture<byte[]> dataBlock(GridGgfsFileInfo fileInfo, long blockIdx) throws IgniteCheckedException { - return data.dataBlock(fileInfo, path, blockIdx, secReader); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsInputStreamImpl.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInvalidRangeException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInvalidRangeException.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInvalidRangeException.java deleted file mode 100644 index 234194a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsInvalidRangeException.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; - -/** - * Internal exception thrown when attempted to update range that is no longer present - * in file affinity map. - */ -public class GridGgfsInvalidRangeException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Error message. - */ - public GridGgfsInvalidRangeException(String msg) { - super(msg); - } - - /** - * @param cause Error cause. - */ - public GridGgfsInvalidRangeException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsIpcHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsIpcHandler.java deleted file mode 100644 index e7dd415..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsIpcHandler.java +++ /dev/null @@ -1,564 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.fs.common.*; -import org.gridgain.grid.kernal.processors.closure.*; -import org.apache.ignite.internal.processors.license.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.license.GridLicenseSubsystem.*; - -/** - * GGFS IPC handler. - */ -class GridGgfsIpcHandler implements GridGgfsServerHandler { - /** For test purposes only. */ - @SuppressWarnings("UnusedDeclaration") - private static boolean errWrite; - - /** Kernal context. */ - private final GridKernalContext ctx; - - /** Log. */ - private IgniteLogger log; - - /** Buffer size. */ - private final int bufSize; // Buffer size. Must not be less then file block size. - - /** Ggfs instance for this handler. */ - private GridGgfsEx ggfs; - - /** Resource ID generator. */ - private AtomicLong rsrcIdGen = new AtomicLong(); - - /** Stopping flag. */ - private volatile boolean stopping; - - /** Management connection. */ - private final boolean mgmt; - - /** - * Constructs GGFS IPC handler. - */ - GridGgfsIpcHandler(GridGgfsContext ggfsCtx, boolean mgmt) { - assert ggfsCtx != null; - - this.mgmt = mgmt; - ctx = ggfsCtx.kernalContext(); - ggfs = ggfsCtx.ggfs(); - - // Keep buffer size multiple of block size so no extra byte array copies is performed. - bufSize = ggfsCtx.configuration().getBlockSize() * 2; - - log = ctx.log(GridGgfsIpcHandler.class); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteCheckedException { - stopping = true; - } - - /** {@inheritDoc} */ - @Override public void onClosed(GridGgfsClientSession ses) { - Iterator<Closeable> it = ses.registeredResources(); - - while (it.hasNext()) { - Closeable stream = it.next(); - - try { - stream.close(); - } - catch (IOException e) { - U.warn(log, "Failed to close opened stream on client close event (will continue) [ses=" + ses + - ", stream=" + stream + ']', e); - } - } - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<GridGgfsMessage> handleAsync(final GridGgfsClientSession ses, - final GridGgfsMessage msg, DataInput in) { - if (!mgmt) - GridLicenseUseRegistry.onUsage(HADOOP, getClass()); - - try { - // Even if will be closed right after this call, response write error will be ignored. - if (stopping) - return null; - - final GridGgfsIpcCommand cmd = msg.command(); - - IgniteFuture<GridGgfsMessage> fut; - - switch (cmd) { - // Execute not-blocking command synchronously in worker thread. - case WRITE_BLOCK: - case MAKE_DIRECTORIES: - case LIST_FILES: - case LIST_PATHS: { - GridGgfsMessage res = execute(ses, cmd, msg, in); - - fut = res == null ? null : new GridFinishedFuture<>(ctx, res); - - break; - } - - // Execute command asynchronously in user's pool. - default: { - fut = ctx.closure().callLocalSafe(new GridPlainCallable<GridGgfsMessage>() { - @Override public GridGgfsMessage call() throws Exception { - // No need to pass data input for non-write-block commands. - return execute(ses, cmd, msg, null); - } - }, GridClosurePolicy.GGFS_POOL); - } - } - - // Pack result object into response format. - return fut; - } - catch (Exception e) { - return new GridFinishedFuture<>(ctx, e); - } - } - - /** - * Execute GGFS command. - * - * @param ses Client connection session. - * @param cmd Command to execute. - * @param msg Message to process. - * @param in Data input in case of block write command. - * @return Command execution result. - * @throws Exception If failed. - */ - private GridGgfsMessage execute(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg, - @Nullable DataInput in) - throws Exception { - switch (cmd) { - case HANDSHAKE: - return processHandshakeRequest((GridGgfsHandshakeRequest)msg); - - case STATUS: - return processStatusRequest(); - - case EXISTS: - case INFO: - case PATH_SUMMARY: - case UPDATE: - case RENAME: - case DELETE: - case MAKE_DIRECTORIES: - case LIST_PATHS: - case LIST_FILES: - case SET_TIMES: - case AFFINITY: - case OPEN_READ: - case OPEN_CREATE: - case OPEN_APPEND: - return processPathControlRequest(ses, cmd, msg); - - case CLOSE: - case READ_BLOCK: - case WRITE_BLOCK: - return processStreamControlRequest(ses, cmd, msg, in); - - default: - throw new IgniteCheckedException("Unsupported IPC command: " + cmd); - } - } - - /** - * Processes handshake request. - * - * @param req Handshake request. - * @return Response message. - * @throws IgniteCheckedException In case of handshake failure. - */ - private GridGgfsMessage processHandshakeRequest(GridGgfsHandshakeRequest req) throws IgniteCheckedException { - if (!F.eq(ctx.gridName(), req.gridName())) - throw new IgniteCheckedException("Failed to perform handshake because actual Grid name differs from expected " + - "[expected=" + req.gridName() + ", actual=" + ctx.gridName() + ']'); - - if (!F.eq(ggfs.name(), req.ggfsName())) - throw new IgniteCheckedException("Failed to perform handshake because actual GGFS name differs from expected " + - "[expected=" + req.ggfsName() + ", actual=" + ggfs.name() + ']'); - - GridGgfsControlResponse res = new GridGgfsControlResponse(); - - ggfs.clientLogDirectory(req.logDirectory()); - - GridGgfsHandshakeResponse handshake = new GridGgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(), - ggfs.groupBlockSize(), ggfs.globalSampling()); - - res.handshake(handshake); - - return res; - } - - /** - * Processes status request. - * - * @return Status response. - */ - private GridGgfsMessage processStatusRequest() throws IgniteCheckedException { - GridGgfsStatus status = ggfs.globalSpace(); - - GridGgfsControlResponse res = new GridGgfsControlResponse(); - - res.status(status); - - return res; - } - - /** - * Processes path control request. - * - * @param ses Session. - * @param cmd Command. - * @param msg Message. - * @return Response message. - * @throws IgniteCheckedException If failed. - */ - private GridGgfsMessage processPathControlRequest(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, - GridGgfsMessage msg) throws IgniteCheckedException { - GridGgfsPathControlRequest req = (GridGgfsPathControlRequest)msg; - - if (log.isDebugEnabled()) - log.debug("Processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + ']'); - - GridGgfsControlResponse res = new GridGgfsControlResponse(); - - switch (cmd) { - case EXISTS: - res.response(ggfs.exists(req.path())); - - break; - - case INFO: - res.response(ggfs.info(req.path())); - - break; - - case PATH_SUMMARY: - res.response(ggfs.summary(req.path())); - - break; - - case UPDATE: - res.response(ggfs.update(req.path(), req.properties())); - - break; - - case RENAME: - ggfs.rename(req.path(), req.destinationPath()); - - res.response(true); - - break; - - case DELETE: - res.response(ggfs.delete(req.path(), req.flag())); - - break; - - case MAKE_DIRECTORIES: - ggfs.mkdirs(req.path(), req.properties()); - - res.response(true); - - break; - - case LIST_PATHS: - res.paths(ggfs.listPaths(req.path())); - - break; - - case LIST_FILES: - res.files(ggfs.listFiles(req.path())); - - break; - - case SET_TIMES: - ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); - - res.response(true); - - break; - - case AFFINITY: - res.locations(ggfs.affinity(req.path(), req.start(), req.length())); - - break; - - case OPEN_READ: { - GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) : - ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); - - long streamId = registerResource(ses, ggfsIn); - - if (log.isDebugEnabled()) - log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - - GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null, - ggfsIn.fileInfo().modificationTime()); - - res.response(new GridGgfsInputStreamDescriptor(streamId, info.length())); - - break; - } - - case OPEN_CREATE: { - long streamId = registerResource(ses, ggfs.create( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Overwrite if exists. - affinityKey(req), // Affinity key based on replication factor. - req.replication(),// Replication factor. - req.blockSize(), // Block size. - req.properties() // File properties. - )); - - if (log.isDebugEnabled()) - log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - - res.response(streamId); - - break; - } - - case OPEN_APPEND: { - long streamId = registerResource(ses, ggfs.append( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Create if absent. - req.properties() // File properties. - )); - - if (log.isDebugEnabled()) - log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - - res.response(streamId); - - break; - } - - default: - assert false : "Unhandled path control request command: " + cmd; - - break; - } - - if (log.isDebugEnabled()) - log.debug("Finished processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + - ", res=" + res + ']'); - - return res; - } - - /** - * Processes stream control request. - * - * @param ses Session. - * @param cmd Command. - * @param msg Message. - * @param in Data input to read. - * @return Response message if needed. - * @throws IgniteCheckedException If failed. - * @throws IOException If failed. - */ - private GridGgfsMessage processStreamControlRequest(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, - GridGgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException { - GridGgfsStreamControlRequest req = (GridGgfsStreamControlRequest)msg; - - Long rsrcId = req.streamId(); - - GridGgfsControlResponse resp = new GridGgfsControlResponse(); - - switch (cmd) { - case CLOSE: { - Closeable res = resource(ses, rsrcId); - - if (log.isDebugEnabled()) - log.debug("Requested to close resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId + - ", res=" + res + ']'); - - if (res == null) - throw new IgniteCheckedException("Resource to close not found: " + rsrcId); - - try { - res.close(); - } - catch (IOException e) { - // Unwrap OutOfSpaceException, if has one. - IgniteFsOutOfSpaceException space = X.cause(e, IgniteFsOutOfSpaceException.class); - - if (space != null) - throw space; - - throw e; - } - - boolean success = ses.unregisterResource(rsrcId, res); - - assert success : "Failed to unregister resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId + - ", res=" + res + ']'; - - if (log.isDebugEnabled()) - log.debug("Closed GGFS stream [ggfsName=" + ggfs.name() + ", streamId=" + rsrcId + - ", ses=" + ses + ']'); - - resp.response(true); - - break; - } - - case READ_BLOCK: { - long pos = req.position(); - int size = req.length(); - - GridGgfsInputStreamAdapter ggfsIn = (GridGgfsInputStreamAdapter)resource(ses, rsrcId); - - if (ggfsIn == null) - throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId); - - byte[][] chunks = ggfsIn.readChunks(pos, size); - - resp.response(chunks); - - // Calculate number of read bytes. - // len = len(first) + (n - 2) * len(block) + len(last). - int len = 0; - - if (chunks.length > 0) - len += chunks[0].length; - - if (chunks.length > 1) - len += chunks[chunks.length - 1].length; - - if (chunks.length > 2) - len += chunks[1].length * (chunks.length - 2); - - resp.length(len); - - break; - } - - case WRITE_BLOCK: { - assert rsrcId != null : "Missing stream ID"; - - IgniteFsOutputStream out = (IgniteFsOutputStream)resource(ses, rsrcId); - - if (out == null) - throw new IgniteCheckedException("Output stream not found (already closed?): " + rsrcId); - - int writeLen = req.length(); - - try { - out.transferFrom(in, writeLen); - - if (errWrite) - throw new IOException("Failed to write data to server (test)."); - - // No response needed. - return null; - } - catch (IOException e) { - resp.error(rsrcId, e.getMessage()); - - break; - } - } - - default: - assert false; - - break; - } - - return resp; - } - - /** - * @param req Path control request. - * @return Affinity key that maps on local node by the time this method is called if replication factor - * is {@code 0}, {@code null} otherwise. - */ - @Nullable private IgniteUuid affinityKey(GridGgfsPathControlRequest req) { - // Do not generate affinity key for replicated or near-only cache. - if (!req.colocate()) { - if (log.isDebugEnabled()) - log.debug("Will not generate affinity key for path control request [ggfsName=" + ggfs.name() + - ", req=" + req + ']'); - - return null; - } - - IgniteUuid key = ggfs.nextAffinityKey(); - - if (log.isDebugEnabled()) - log.debug("Generated affinity key for path control request [ggfsName=" + ggfs.name() + - ", req=" + req + ", key=" + key + ']'); - - return key; - } - - /** - * Registers closeable resource within client session. - * - * @param ses IPC session. - * @param rsrc Resource to register. - * @return Registration resource ID. - */ - private long registerResource(GridGgfsClientSession ses, Closeable rsrc) { - long rsrcId = rsrcIdGen.getAndIncrement(); - - boolean registered = ses.registerResource(rsrcId, rsrc); - - assert registered : "Failed to register resource (duplicate id?): " + rsrcId; - - return rsrcId; - } - - /** - * Gets resource by resource ID from client session. - * - * @param ses Session to get resource from. - * @param rsrcId Resource ID. - * @return Registered resource or {@code null} if not found. - */ - @Nullable private Closeable resource(GridGgfsClientSession ses, Long rsrcId) { - return ses.resource(rsrcId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java deleted file mode 100644 index f75d664..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.resources.*; - -import java.io.*; - -/** - * GGFS job implementation. - */ -public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFsJob> { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS job. */ - private IgniteFsJob job; - - /** GGFS name. */ - private String ggfsName; - - /** GGFS path. */ - private IgniteFsPath path; - - /** Start. */ - private long start; - - /** Length. */ - private long len; - - /** Split resolver. */ - private IgniteFsRecordResolver rslvr; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Injected logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** - * @param job GGFS job. - * @param ggfsName GGFS name. - * @param path Split path. - * @param start Split start offset. - * @param len Split length. - * @param rslvr GGFS split resolver. - */ - public GridGgfsJobImpl(IgniteFsJob job, String ggfsName, IgniteFsPath path, long start, long len, - IgniteFsRecordResolver rslvr) { - this.job = job; - this.ggfsName = ggfsName; - this.path = path; - this.start = start; - this.len = len; - this.rslvr = rslvr; - } - - /** {@inheritDoc} */ - @Override public Object execute() throws IgniteCheckedException { - IgniteFs ggfs = ignite.fileSystem(ggfsName); - - try (IgniteFsInputStream in = ggfs.open(path)) { - IgniteFsFileRange split = new IgniteFsFileRange(path, start, len); - - if (rslvr != null) { - split = rslvr.resolveRecords(ggfs, in, split); - - if (split == null) { - log.warning("No data found for split on local node after resolver is applied " + - "[ggfsName=" + ggfsName + ", path=" + path + ", start=" + start + ", len=" + len + ']'); - - return null; - } - } - - in.seek(split.start()); - - return job.execute(ggfs, new IgniteFsFileRange(path, split.start(), split.length()), in); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to execute GGFS job for file split [ggfsName=" + ggfsName + - ", path=" + path + ", start=" + start + ", len=" + len + ']', e); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - job.cancel(); - } - - /** {@inheritDoc} */ - @Override public IgniteFsJob userObject() { - return job; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsListingEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsListingEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsListingEntry.java deleted file mode 100644 index c191ec3..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsListingEntry.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Directory listing entry. - */ -public class GridGgfsListingEntry implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** File id. */ - private IgniteUuid fileId; - - /** File affinity key. */ - private IgniteUuid affKey; - - /** Positive block size if file, 0 if directory. */ - private int blockSize; - - /** File length. */ - private long len; - - /** Last access time. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** File properties. */ - private Map<String, String> props; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridGgfsListingEntry() { - // No-op. - } - - /** - * @param fileInfo File info to construct listing entry from. - */ - public GridGgfsListingEntry(GridGgfsFileInfo fileInfo) { - fileId = fileInfo.id(); - affKey = fileInfo.affinityKey(); - - if (fileInfo.isFile()) { - blockSize = fileInfo.blockSize(); - len = fileInfo.length(); - } - - props = fileInfo.properties(); - accessTime = fileInfo.accessTime(); - modificationTime = fileInfo.modificationTime(); - } - - /** - * Creates listing entry with updated length. - * - * @param entry Entry. - * @param len New length. - */ - public GridGgfsListingEntry(GridGgfsListingEntry entry, long len, long accessTime, long modificationTime) { - fileId = entry.fileId; - affKey = entry.affKey; - blockSize = entry.blockSize; - props = entry.props; - this.accessTime = accessTime; - this.modificationTime = modificationTime; - - this.len = len; - } - - /** - * @return Entry file ID. - */ - public IgniteUuid fileId() { - return fileId; - } - - /** - * @return File affinity key, if specified. - */ - public IgniteUuid affinityKey() { - return affKey; - } - - /** - * @return {@code True} if entry represents file. - */ - public boolean isFile() { - return blockSize > 0; - } - - /** - * @return {@code True} if entry represents directory. - */ - public boolean isDirectory() { - return blockSize == 0; - } - - /** - * @return Block size. - */ - public int blockSize() { - return blockSize; - } - - /** - * @return Length. - */ - public long length() { - return len; - } - - /** - * @return Last access time. - */ - public long accessTime() { - return accessTime; - } - - /** - * @return Last modification time. - */ - public long modificationTime() { - return modificationTime; - } - - /** - * @return Properties map. - */ - public Map<String, String> properties() { - return props; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, fileId); - out.writeInt(blockSize); - out.writeLong(len); - U.writeStringMap(out, props); - out.writeLong(accessTime); - out.writeLong(modificationTime); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fileId = U.readGridUuid(in); - blockSize = in.readInt(); - len = in.readLong(); - props = U.readStringMap(in); - accessTime = in.readLong(); - modificationTime = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof GridGgfsListingEntry)) return false; - - GridGgfsListingEntry that = (GridGgfsListingEntry)o; - - return fileId.equals(that.fileId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return fileId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsListingEntry.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsLocalMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsLocalMetrics.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsLocalMetrics.java deleted file mode 100644 index 34901c5..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsLocalMetrics.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jdk8.backport.*; - -/** - * Value object holding all local GGFS metrics which cannot be determined using file system traversal. - */ -public class GridGgfsLocalMetrics { - /** Block reads. First value - total reads, second value - reads delegated to the secondary file system. */ - private volatile IgniteBiTuple<LongAdder, LongAdder> blocksRead; - - /** Block writes. First value - total writes, second value - writes delegated to the secondary file system. */ - private volatile IgniteBiTuple<LongAdder, LongAdder> blocksWritten; - - /** Byte reads. First value - total bytes read, second value - consumed time. */ - private volatile IgniteBiTuple<LongAdder, LongAdder> bytesRead; - - /** Byte writes. First value - total bytes written, second value - consumed time. */ - private volatile IgniteBiTuple<LongAdder, LongAdder> bytesWritten; - - /** Number of files opened for read. */ - private final LongAdder filesOpenedForRead = new LongAdder(); - - /** Number of files opened for write. */ - private final LongAdder filesOpenedForWrite = new LongAdder(); - - /** - * Constructor. - */ - GridGgfsLocalMetrics() { - reset(); - } - - /** - * @return Read bytes. - */ - long readBytes() { - return bytesRead.get1().longValue(); - } - - /** - * @return Read bytes time. - */ - long readBytesTime() { - return bytesRead.get2().longValue(); - } - - /** - * Adds given numbers to read bytes and read time. - * - * @param readBytes Number of bytes read. - * @param readTime Read time. - */ - void addReadBytesTime(long readBytes, long readTime) { - IgniteBiTuple<LongAdder, LongAdder> bytesRead0 = bytesRead; - - bytesRead0.get1().add(readBytes); - bytesRead0.get2().add(readTime); - } - - /** - * @return Written bytes. - */ - long writeBytes() { - return bytesWritten.get1().longValue(); - } - - /** - * @return Write bytes time. - */ - long writeBytesTime() { - return bytesWritten.get2().longValue(); - } - - /** - * Adds given numbers to written bytes and write time. - * - * @param writtenBytes Number of bytes written. - * @param writeTime Write time. - */ - void addWrittenBytesTime(long writtenBytes, long writeTime) { - IgniteBiTuple<LongAdder, LongAdder> bytesWritten0 = bytesWritten; - - bytesWritten0.get1().add(writtenBytes); - bytesWritten0.get2().add(writeTime); - } - - /** - * @return Read blocks. - */ - long readBlocks() { - return blocksRead.get1().longValue(); - } - - /** - * @return Written blocks to secondary file system. - */ - long readBlocksSecondary() { - return blocksRead.get2().longValue(); - } - - /** - * Adds given numbers to read blocks counters. - * - * @param total Total number of blocks read. - * @param secondary Number of blocks read form secondary FS. - */ - void addReadBlocks(int total, int secondary) { - IgniteBiTuple<LongAdder, LongAdder> blocksRead0 = blocksRead; - - blocksRead0.get1().add(total); - blocksRead0.get2().add(secondary); - } - - /** - * @return Written blocks. - */ - long writeBlocks() { - return blocksWritten.get1().longValue(); - } - - /** - * @return Written blocks to secondary file system. - */ - long writeBlocksSecondary() { - return blocksWritten.get2().longValue(); - } - - /** - * Adds given numbers to write blocks counters. - * - * @param total Total number of block written. - * @param secondary Number of blocks written to secondary FS. - */ - void addWriteBlocks(int total, int secondary) { - IgniteBiTuple<LongAdder, LongAdder> blocksWritten0 = blocksWritten; - - blocksWritten0.get1().add(total); - blocksWritten0.get2().add(secondary); - } - - /** - * Increment files opened for read. - */ - void incrementFilesOpenedForRead() { - filesOpenedForRead.increment(); - } - - /** - * Decrement files opened for read. - */ - void decrementFilesOpenedForRead() { - filesOpenedForRead.decrement(); - } - - /** - * @return Files opened for read. - */ - int filesOpenedForRead() { - return filesOpenedForRead.intValue(); - } - - /** - * Increment files opened for write. - */ - void incrementFilesOpenedForWrite() { - filesOpenedForWrite.increment(); - } - - /** - * Decrement files opened for write. - */ - void decrementFilesOpenedForWrite() { - filesOpenedForWrite.decrement(); - } - - /** - * @return Files opened for write. - */ - int filesOpenedForWrite() { - return filesOpenedForWrite.intValue(); - } - - /** - * Reset summary counters. - */ - void reset() { - blocksRead = F.t(new LongAdder(), new LongAdder()); - blocksWritten = F.t(new LongAdder(), new LongAdder()); - bytesRead = F.t(new LongAdder(), new LongAdder()); - bytesWritten = F.t(new LongAdder(), new LongAdder()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsManager.java deleted file mode 100644 index e5da7a8..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsManager.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.gridgain.grid.*; - -import java.util.concurrent.atomic.*; - -/** - * Abstract class for GGFS managers. - */ -public abstract class GridGgfsManager { - /** GGFS context. */ - protected GridGgfsContext ggfsCtx; - - /** Logger. */ - protected IgniteLogger log; - - /** Starting flag. */ - private AtomicBoolean starting = new AtomicBoolean(); - - /** - * Called when GGFS processor is started. - * - * @param ggfsCtx GGFS context. - */ - public void start(GridGgfsContext ggfsCtx) throws IgniteCheckedException { - if (!starting.compareAndSet(false, true)) - assert false : "Method start is called more than once for manager: " + this; - - assert ggfsCtx != null; - - this.ggfsCtx = ggfsCtx; - - log = ggfsCtx.kernalContext().log(getClass()); - - start0(); - - if (log != null && log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** - * Stops manager. - * - * @param cancel Cancel flag. - */ - public final void stop(boolean cancel) { - if (!starting.get()) - // Ignoring attempt to stop manager that has never been started. - return; - - stop0(cancel); - - if (log != null && log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public final void onKernalStart() throws IgniteCheckedException { - onKernalStart0(); - - if (log != null && log.isDebugEnabled()) - log.debug(kernalStartInfo()); - } - - /** - * @param cancel Cancel flag. - */ - public final void onKernalStop(boolean cancel) { - if (!starting.get()) - // Ignoring attempt to stop manager that has never been started. - return; - - onKernalStop0(cancel); - - if (log != null && log.isDebugEnabled()) - log.debug(kernalStopInfo()); - } - - /** - * Start manager implementation. - */ - protected void start0() throws IgniteCheckedException { - // No-op by default. - } - - /** - * Stop manager implementation. - * - * @param cancel Cancel flag. - */ - protected void stop0(boolean cancel) { - // No-op by default. - } - - /** - * @throws IgniteCheckedException If failed. - */ - protected void onKernalStart0() throws IgniteCheckedException { - // No-op. - } - - /** - * - */ - protected void onKernalStop0(boolean cancel) { - // No-op. - } - - /** - * @return Start info. - */ - protected String startInfo() { - return "Cache manager started: " + getClass().getSimpleName(); - } - - /** - * @return Stop info. - */ - protected String stopInfo() { - return "Cache manager stopped: " + getClass().getSimpleName(); - } - - /** - * @return Start info. - */ - protected String kernalStartInfo() { - return "Cache manager received onKernalStart() callback: " + getClass().getSimpleName(); - } - - /** - * @return Stop info. - */ - protected String kernalStopInfo() { - return "Cache manager received onKernalStop() callback: " + getClass().getSimpleName(); - } -}
