http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsFileImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsFileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsFileImpl.java new file mode 100644 index 0000000..b694d7b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsFileImpl.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * File or directory information. + */ +public final class IgniteFsFileImpl implements IgniteFsFile, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Path to this file. */ + private IgniteFsPath path; + + /** File id. */ + private IgniteUuid fileId; + + /** Block size. */ + private int blockSize; + + /** Group block size. */ + private long grpBlockSize; + + /** File length. */ + private long len; + + /** Last access time. */ + private long accessTime; + + /** Last modification time. */ + private long modificationTime; + + /** Properties. */ + private Map<String, String> props; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgniteFsFileImpl() { + // No-op. + } + + /** + * Constructs directory info. + * + * @param path Path. + */ + public IgniteFsFileImpl(IgniteFsPath path, GridGgfsFileInfo info, long globalGrpBlockSize) { + A.notNull(path, "path"); + A.notNull(info, "info"); + + this.path = path; + fileId = info.id(); + + if (info.isFile()) { + blockSize = info.blockSize(); + len = info.length(); + + grpBlockSize = info.affinityKey() == null ? globalGrpBlockSize : + info.length() == 0 ? globalGrpBlockSize : info.length(); + } + + props = info.properties(); + + if (props == null) + props = Collections.emptyMap(); + + accessTime = info.accessTime(); + modificationTime = info.modificationTime(); + } + + /** + * Constructs file instance. + * + * @param path Path. + * @param entry Listing entry. + */ + public IgniteFsFileImpl(IgniteFsPath path, GridGgfsListingEntry entry, long globalGrpSize) { + A.notNull(path, "path"); + A.notNull(entry, "entry"); + + this.path = path; + fileId = entry.fileId(); + + blockSize = entry.blockSize(); + + grpBlockSize = entry.affinityKey() == null ? globalGrpSize : + entry.length() == 0 ? globalGrpSize : entry.length(); + + len = entry.length(); + props = entry.properties(); + + accessTime = entry.accessTime(); + modificationTime = entry.modificationTime(); + } + + /** {@inheritDoc} */ + @Override public IgniteFsPath path() { + return path; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** {@inheritDoc} */ + @Override public boolean isFile() { + return blockSize > 0; + } + + /** {@inheritDoc} */ + @Override public boolean isDirectory() { + return blockSize == 0; + } + + /** {@inheritDoc} */ + @Override public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public int blockSize() { + return blockSize; + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return grpBlockSize; + } + + /** {@inheritDoc} */ + @Override public long accessTime() { + return accessTime; + } + + /** {@inheritDoc} */ + @Override public long modificationTime() { + return modificationTime; + } + + /** {@inheritDoc} */ + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + /** {@inheritDoc} */ + @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + + /** + * Writes object to data output. + * + * @param out Data output. + */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + path.writeExternal(out); + + out.writeInt(blockSize); + out.writeLong(grpBlockSize); + out.writeLong(len); + U.writeStringMap(out, props); + out.writeLong(accessTime); + out.writeLong(modificationTime); + } + + /** + * Reads object from data input. + * + * @param in Data input. + */ + @Override public void readExternal(ObjectInput in) throws IOException { + path = new IgniteFsPath(); + + path.readExternal(in); + + blockSize = in.readInt(); + grpBlockSize = in.readLong(); + len = in.readLong(); + props = U.readStringMap(in); + accessTime = in.readLong(); + modificationTime = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return path.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + IgniteFsFileImpl that = (IgniteFsFileImpl)o; + + return path.equals(that.path); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsFileImpl.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsMetricsAdapter.java new file mode 100644 index 0000000..461e32d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsMetricsAdapter.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * GGFS metrics adapter. + */ +public class IgniteFsMetricsAdapter implements IgniteFsMetrics, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Used space on local node. */ + private long locSpaceSize; + + /** Maximum space. */ + private long maxSpaceSize; + + /** Secondary file system used space. */ + private long secondarySpaceSize; + + /** Number of directories. */ + private int dirsCnt; + + /** Number of files. */ + private int filesCnt; + + /** Number of files opened for read. */ + private int filesOpenedForRead; + + /** Number of files opened for write. */ + private int filesOpenedForWrite; + + /** Total blocks read. */ + private long blocksReadTotal; + + /** Total blocks remote read. */ + private long blocksReadRmt; + + /** Total blocks write. */ + private long blocksWrittenTotal; + + /** Total blocks write remote. */ + private long blocksWrittenRmt; + + /** Total bytes read. */ + private long bytesRead; + + /** Total bytes read time. */ + private long bytesReadTime; + + /** Total bytes write. */ + private long bytesWritten; + + /** Total bytes write time. */ + private long bytesWriteTime; + + /** + * {@link Externalizable} support. + */ + public IgniteFsMetricsAdapter() { + // No-op. + } + + /** + * @param locSpaceSize Used space on local node. + * @param maxSpaceSize Maximum space size. + * @param secondarySpaceSize Secondary space size. + * @param dirsCnt Number of directories. + * @param filesCnt Number of files. + * @param filesOpenedForRead Number of files opened for read. + * @param filesOpenedForWrite Number of files opened for write. + * @param blocksReadTotal Total blocks read. + * @param blocksReadRmt Total blocks read remotely. + * @param blocksWrittenTotal Total blocks written. + * @param blocksWrittenRmt Total blocks written remotely. + * @param bytesRead Total bytes read. + * @param bytesReadTime Total bytes read time. + * @param bytesWritten Total bytes written. + * @param bytesWriteTime Total bytes write time. + */ + public IgniteFsMetricsAdapter(long locSpaceSize, long maxSpaceSize, long secondarySpaceSize, int dirsCnt, + int filesCnt, int filesOpenedForRead, int filesOpenedForWrite, long blocksReadTotal, long blocksReadRmt, + long blocksWrittenTotal, long blocksWrittenRmt, long bytesRead, long bytesReadTime, long bytesWritten, + long bytesWriteTime) { + this.locSpaceSize = locSpaceSize; + this.maxSpaceSize = maxSpaceSize; + this.secondarySpaceSize = secondarySpaceSize; + this.dirsCnt = dirsCnt; + this.filesCnt = filesCnt; + this.filesOpenedForRead = filesOpenedForRead; + this.filesOpenedForWrite = filesOpenedForWrite; + this.blocksReadTotal = blocksReadTotal; + this.blocksReadRmt = blocksReadRmt; + this.blocksWrittenTotal = blocksWrittenTotal; + this.blocksWrittenRmt = blocksWrittenRmt; + this.bytesRead = bytesRead; + this.bytesReadTime = bytesReadTime; + this.bytesWritten = bytesWritten; + this.bytesWriteTime = bytesWriteTime; + } + + /** {@inheritDoc} */ + @Override public long localSpaceSize() { + return locSpaceSize; + } + + /** {@inheritDoc} */ + @Override public long maxSpaceSize() { + return maxSpaceSize; + } + + /** {@inheritDoc} */ + @Override public long secondarySpaceSize() { + return secondarySpaceSize; + } + + /** {@inheritDoc} */ + @Override public int directoriesCount() { + return dirsCnt; + } + + /** {@inheritDoc} */ + @Override public int filesCount() { + return filesCnt; + } + + /** {@inheritDoc} */ + @Override public int filesOpenedForRead() { + return filesOpenedForRead; + } + + /** {@inheritDoc} */ + @Override public int filesOpenedForWrite() { + return filesOpenedForWrite; + } + + /** {@inheritDoc} */ + @Override public long blocksReadTotal() { + return blocksReadTotal; + } + + /** {@inheritDoc} */ + @Override public long blocksReadRemote() { + return blocksReadRmt; + } + + /** {@inheritDoc} */ + @Override public long blocksWrittenTotal() { + return blocksWrittenTotal; + } + + /** {@inheritDoc} */ + @Override public long blocksWrittenRemote() { + return blocksWrittenRmt; + } + + /** {@inheritDoc} */ + @Override public long bytesRead() { + return bytesRead; + } + + /** {@inheritDoc} */ + @Override public long bytesReadTime() { + return bytesReadTime; + } + + /** {@inheritDoc} */ + @Override public long bytesWritten() { + return bytesWritten; + } + + /** {@inheritDoc} */ + @Override public long bytesWriteTime() { + return bytesWriteTime; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(locSpaceSize); + out.writeLong(maxSpaceSize); + out.writeLong(secondarySpaceSize); + out.writeInt(dirsCnt); + out.writeInt(filesCnt); + out.writeInt(filesOpenedForRead); + out.writeInt(filesOpenedForWrite); + out.writeLong(blocksReadTotal); + out.writeLong(blocksReadRmt); + out.writeLong(blocksWrittenTotal); + out.writeLong(blocksWrittenRmt); + out.writeLong(bytesRead); + out.writeLong(bytesReadTime); + out.writeLong(bytesWritten); + out.writeLong(bytesWriteTime); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + locSpaceSize = in.readLong(); + maxSpaceSize = in.readLong(); + secondarySpaceSize = in.readLong(); + dirsCnt = in.readInt(); + filesCnt = in.readInt(); + filesOpenedForRead = in.readInt(); + filesOpenedForWrite = in.readInt(); + blocksReadTotal = in.readLong(); + blocksReadRmt = in.readLong(); + blocksWrittenTotal = in.readLong(); + blocksWrittenRmt = in.readLong(); + bytesRead = in.readLong(); + bytesReadTime = in.readLong(); + bytesWritten = in.readLong(); + bytesWriteTime = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsMetricsAdapter.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamAdapter.java new file mode 100644 index 0000000..d3e75b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamAdapter.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; + +/** + * Output stream to store data into grid cache with separate blocks. + */ +@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") +abstract class IgniteFsOutputStreamAdapter extends IgniteFsOutputStream { + /** Path to file. */ + protected final IgniteFsPath path; + + /** Buffer size. */ + private final int bufSize; + + /** Flag for this stream open/closed state. */ + private boolean closed; + + /** Local buffer to store stream data as consistent block. */ + private ByteBuffer buf; + + /** Bytes written. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + protected long bytes; + + /** Time consumed by write operations. */ + protected long time; + + /** + * Constructs file output stream. + * + * @param path Path to stored file. + * @param bufSize The size of the buffer to be used. + */ + IgniteFsOutputStreamAdapter(IgniteFsPath path, int bufSize) { + assert path != null; + assert bufSize > 0; + + this.path = path; + this.bufSize = bufSize; + } + + /** + * Gets number of written bytes. + * + * @return Written bytes. + */ + public long bytes() { + return bytes; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(int b) throws IOException { + checkClosed(null, 0); + + long startTime = System.nanoTime(); + + b &= 0xFF; + + if (buf == null) + buf = ByteBuffer.allocate(bufSize); + + buf.put((byte)b); + + if (buf.position() >= bufSize) + sendData(true); // Send data to server. + + time += System.nanoTime() - startTime; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + A.notNull(b, "b"); + + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off + + ", length=" + len + ']'); + } + + checkClosed(null, 0); + + if (len == 0) + return; // Done. + + long startTime = System.nanoTime(); + + if (buf == null) { + // Do not allocate and copy byte buffer if will send data immediately. + if (len >= bufSize) { + buf = ByteBuffer.wrap(b, off, len); + + sendData(false); + + return; + } + + buf = ByteBuffer.allocate(Math.max(bufSize, len)); + } + + if (buf.remaining() < len) + // Expand buffer capacity, if remaining size is less then data size. + buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip()); + + assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " + + "[len=" + len + ", buf.remaining=" + buf.remaining() + ']'; + + buf.put(b, off, len); + + if (buf.position() >= bufSize) + sendData(true); // Send data to server. + + time += System.nanoTime() - startTime; + } + + /** {@inheritDoc} */ + @Override public synchronized void transferFrom(DataInput in, int len) throws IOException { + checkClosed(in, len); + + long startTime = System.nanoTime(); + + // Send all IPC data from the local buffer before streaming. + if (buf != null && buf.position() > 0) + sendData(true); + + try { + storeDataBlocks(in, len); + } + catch (IgniteCheckedException e) { + throw new IOException(e.getMessage(), e); + } + + time += System.nanoTime() - startTime; + } + + /** + * Flushes this output stream and forces any buffered output bytes to be written out. + * + * @exception IOException if an I/O error occurs. + */ + @Override public synchronized void flush() throws IOException { + checkClosed(null, 0); + + // Send all IPC data from the local buffer. + if (buf != null && buf.position() > 0) + sendData(true); + } + + /** {@inheritDoc} */ + @Override public final synchronized void close() throws IOException { + // Do nothing if stream is already closed. + if (closed) + return; + + try { + // Send all IPC data from the local buffer. + try { + flush(); + } + finally { + onClose(); // "onClose()" routine must be invoked anyway! + } + } + finally { + // Mark this stream closed AFTER flush. + closed = true; + } + } + + /** + * Store data blocks in file.<br/> + * Note! If file concurrently deleted we'll get lost blocks. + * + * @param data Data to store. + * @throws IgniteCheckedException If failed. + */ + protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException; + + /** + * Store data blocks in file reading appropriate number of bytes from given data input. + * + * @param in Data input to read from. + * @param len Data length to store. + * @throws IgniteCheckedException If failed. + */ + protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException; + + /** + * Close callback. It will be called only once in synchronized section. + * + * @throws IOException If failed. + */ + protected void onClose() throws IOException { + // No-op. + } + + /** + * Validate this stream is open. + * + * @throws IOException If this stream is closed. + */ + private void checkClosed(@Nullable DataInput in, int len) throws IOException { + assert Thread.holdsLock(this); + + if (closed) { + // Must read data from stream before throwing exception. + if (in != null) + in.skipBytes(len); + + throw new IOException("Stream has been closed: " + this); + } + } + + /** + * Send all local-buffered data to server. + * + * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped + * byte array. + * @throws IOException In case of IO exception. + */ + private void sendData(boolean flip) throws IOException { + assert Thread.holdsLock(this); + + try { + if (flip) + buf.flip(); + + storeDataBlock(buf); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to store data into file: " + path, e); + } + + buf = null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsOutputStreamAdapter.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamImpl.java new file mode 100644 index 0000000..f069eae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsOutputStreamImpl.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.future.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.fs.IgniteFsMode.*; + +/** + * Output stream to store data into grid cache with separate blocks. + */ +class IgniteFsOutputStreamImpl extends IgniteFsOutputStreamAdapter { + /** Maximum number of blocks in buffer. */ + private static final int MAX_BLOCKS_CNT = 16; + + /** GGFS context. */ + private GridGgfsContext ggfsCtx; + + /** Meta info manager. */ + private final GridGgfsMetaManager meta; + + /** Data manager. */ + private final GridGgfsDataManager data; + + /** File descriptor. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private GridGgfsFileInfo fileInfo; + + /** Parent ID. */ + private final IgniteUuid parentId; + + /** File name. */ + private final String fileName; + + /** Space in file to write data. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private long space; + + /** Intermediate remainder to keep data. */ + private byte[] remainder; + + /** Data length in remainder. */ + private int remainderDataLen; + + /** Write completion future. */ + private final IgniteFuture<Boolean> writeCompletionFut; + + /** GGFS mode. */ + private final IgniteFsMode mode; + + /** File worker batch. */ + private final GridGgfsFileWorkerBatch batch; + + /** Ensures that onClose)_ routine is called no more than once. */ + private final AtomicBoolean onCloseGuard = new AtomicBoolean(); + + /** Local GGFS metrics. */ + private final GridGgfsLocalMetrics metrics; + + /** Affinity written by this output stream. */ + private GridGgfsFileAffinityRange streamRange; + + /** + * Constructs file output stream. + * + * @param ggfsCtx GGFS context. + * @param path Path to stored file. + * @param fileInfo File info to write binary data to. + * @param bufSize The size of the buffer to be used. + * @param mode Grid GGFS mode. + * @param batch Optional secondary file system batch. + * @param metrics Local GGFs metrics. + * @throws IgniteCheckedException If stream creation failed. + */ + IgniteFsOutputStreamImpl(GridGgfsContext ggfsCtx, IgniteFsPath path, GridGgfsFileInfo fileInfo, IgniteUuid parentId, + int bufSize, IgniteFsMode mode, @Nullable GridGgfsFileWorkerBatch batch, GridGgfsLocalMetrics metrics) + throws IgniteCheckedException { + super(path, optimizeBufferSize(bufSize, fileInfo)); + + assert fileInfo != null; + assert fileInfo.isFile() : "Unexpected file info: " + fileInfo; + assert mode != null && mode != PROXY; + assert mode == PRIMARY && batch == null || batch != null; + assert metrics != null; + + // File hasn't been locked. + if (fileInfo.lockId() == null) + throw new IgniteFsException("Failed to acquire file lock (concurrently modified?): " + path); + + this.ggfsCtx = ggfsCtx; + meta = ggfsCtx.meta(); + data = ggfsCtx.data(); + + this.fileInfo = fileInfo; + this.mode = mode; + this.batch = batch; + this.parentId = parentId; + this.metrics = metrics; + + streamRange = initialStreamRange(fileInfo); + + fileName = path.name(); + + writeCompletionFut = data.writeStart(fileInfo); + } + + /** + * Optimize buffer size. + * + * @param bufSize Requested buffer size. + * @param fileInfo File info. + * @return Optimized buffer size. + */ + @SuppressWarnings("IfMayBeConditional") + private static int optimizeBufferSize(int bufSize, GridGgfsFileInfo fileInfo) { + assert bufSize > 0; + + if (fileInfo == null) + return bufSize; + + int blockSize = fileInfo.blockSize(); + + if (blockSize <= 0) + return bufSize; + + if (bufSize <= blockSize) + // Optimize minimum buffer size to be equal file's block size. + return blockSize; + + int maxBufSize = blockSize * MAX_BLOCKS_CNT; + + if (bufSize > maxBufSize) + // There is no profit or optimization from larger buffers. + return maxBufSize; + + if (fileInfo.length() == 0) + // Make buffer size multiple of block size (optimized for new files). + return bufSize / blockSize * blockSize; + + return bufSize; + } + + /** {@inheritDoc} */ + @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException { + int writeLen = block.remaining(); + + preStoreDataBlocks(null, writeLen); + + int blockSize = fileInfo.blockSize(); + + // If data length is not enough to fill full block, fill the remainder and return. + if (remainderDataLen + writeLen < blockSize) { + if (remainder == null) + remainder = new byte[blockSize]; + else if (remainder.length != blockSize) { + assert remainderDataLen == remainder.length; + + byte[] allocated = new byte[blockSize]; + + U.arrayCopy(remainder, 0, allocated, 0, remainder.length); + + remainder = allocated; + } + + block.get(remainder, remainderDataLen, writeLen); + + remainderDataLen += writeLen; + } + else { + remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block, + false, streamRange, batch); + + remainderDataLen = remainder == null ? 0 : remainder.length; + } + } + + /** {@inheritDoc} */ + @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException { + preStoreDataBlocks(in, len); + + int blockSize = fileInfo.blockSize(); + + // If data length is not enough to fill full block, fill the remainder and return. + if (remainderDataLen + len < blockSize) { + if (remainder == null) + remainder = new byte[blockSize]; + else if (remainder.length != blockSize) { + assert remainderDataLen == remainder.length; + + byte[] allocated = new byte[blockSize]; + + U.arrayCopy(remainder, 0, allocated, 0, remainder.length); + + remainder = allocated; + } + + in.readFully(remainder, remainderDataLen, len); + + remainderDataLen += len; + } + else { + remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len, + false, streamRange, batch); + + remainderDataLen = remainder == null ? 0 : remainder.length; + } + } + + /** + * Initializes data loader if it was not initialized yet and updates written space. + * + * @param len Data length to be written. + */ + private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException { + // Check if any exception happened while writing data. + if (writeCompletionFut.isDone()) { + assert ((GridFutureAdapter)writeCompletionFut).isFailed(); + + if (in != null) + in.skipBytes(len); + + writeCompletionFut.get(); + } + + bytes += len; + space += len; + } + + /** + * Flushes this output stream and forces any buffered output bytes to be written out. + * + * @exception IOException if an I/O error occurs. + */ + @Override public synchronized void flush() throws IOException { + boolean exists; + + try { + exists = meta.exists(fileInfo.id()); + } + catch (IgniteCheckedException e) { + throw new IOError(e); // Something unrecoverable. + } + + if (!exists) { + onClose(true); + + throw new IOException("File was concurrently deleted: " + path); + } + + super.flush(); + + try { + if (remainder != null) { + data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0, + ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); + + remainder = null; + remainderDataLen = 0; + } + + if (space > 0) { + GridGgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(), + new ReserveSpaceClosure(space, streamRange)); + + if (fileInfo0 == null) + throw new IOException("File was concurrently deleted: " + path); + else + fileInfo = fileInfo0; + + streamRange = initialStreamRange(fileInfo); + + space = 0; + } + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e); + } + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IOException { + onClose(false); + } + + /** + * Close callback. It will be called only once in synchronized section. + * + * @param deleted Whether we already know that the file was deleted. + * @throws IOException If failed. + */ + private void onClose(boolean deleted) throws IOException { + assert Thread.holdsLock(this); + + if (onCloseGuard.compareAndSet(false, true)) { + // Notify backing secondary file system batch to finish. + if (mode != PRIMARY) { + assert batch != null; + + batch.finish(); + } + + // Ensure file existence. + boolean exists; + + try { + exists = !deleted && meta.exists(fileInfo.id()); + } + catch (IgniteCheckedException e) { + throw new IOError(e); // Something unrecoverable. + } + + if (exists) { + IOException err = null; + + try { + data.writeClose(fileInfo); + + writeCompletionFut.get(); + } + catch (IgniteCheckedException e) { + err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e); + } + + metrics.addWrittenBytesTime(bytes, time); + + // Await secondary file system processing to finish. + if (mode == DUAL_SYNC) { + try { + batch.await(); + } + catch (IgniteCheckedException e) { + if (err == null) + err = new IOException("Failed to close secondary file system stream [path=" + path + + ", fileInfo=" + fileInfo + ']', e); + } + } + + long modificationTime = System.currentTimeMillis(); + + try { + meta.unlock(fileInfo, modificationTime); + } + catch (IgniteFsFileNotFoundException ignore) { + data.delete(fileInfo); // Safety to ensure that all data blocks are deleted. + + throw new IOException("File was concurrently deleted: " + path); + } + catch (IgniteCheckedException e) { + throw new IOError(e); // Something unrecoverable. + } + + meta.updateParentListingAsync(parentId, fileInfo.id(), fileName, bytes, modificationTime); + + if (err != null) + throw err; + } + else { + try { + if (mode == DUAL_SYNC) + batch.await(); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to close secondary file system stream [path=" + path + + ", fileInfo=" + fileInfo + ']', e); + } + finally { + data.delete(fileInfo); + } + } + } + } + + /** + * Gets initial affinity range. This range will have 0 length and will start from first + * non-occupied file block. + * + * @param fileInfo File info to build initial range for. + * @return Affinity range. + */ + private GridGgfsFileAffinityRange initialStreamRange(GridGgfsFileInfo fileInfo) { + if (!ggfsCtx.configuration().isFragmentizerEnabled()) + return null; + + if (!Boolean.parseBoolean(fileInfo.properties().get(IgniteFs.PROP_PREFER_LOCAL_WRITES))) + return null; + + int blockSize = fileInfo.blockSize(); + + // Find first non-occupied block offset. + long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize; + + // Need to get last affinity key and reuse it if we are on the same node. + long lastBlockOff = off - fileInfo.blockSize(); + + if (lastBlockOff < 0) + lastBlockOff = 0; + + GridGgfsFileMap map = fileInfo.fileMap(); + + IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false); + + IgniteUuid affKey = data.nextAffinityKey(prevAffKey); + + return affKey == null ? null : new GridGgfsFileAffinityRange(off, off, affKey); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsOutputStreamImpl.class, this); + } + + /** + * Helper closure to reserve specified space and update file's length + */ + @GridInternal + private static final class ReserveSpaceClosure implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Space amount (bytes number) to increase file's length. */ + private long space; + + /** Affinity range for this particular update. */ + private GridGgfsFileAffinityRange range; + + /** + * Empty constructor required for {@link Externalizable}. + * + */ + public ReserveSpaceClosure() { + // No-op. + } + + /** + * Constructs the closure to reserve specified space and update file's length. + * + * @param space Space amount (bytes number) to increase file's length. + * @param range Affinity range specifying which part of file was colocated. + */ + private ReserveSpaceClosure(long space, GridGgfsFileAffinityRange range) { + this.space = space; + this.range = range; + } + + /** {@inheritDoc} */ + @Override public GridGgfsFileInfo apply(GridGgfsFileInfo oldInfo) { + GridGgfsFileMap oldMap = oldInfo.fileMap(); + + GridGgfsFileMap newMap = new GridGgfsFileMap(oldMap); + + newMap.addRange(range); + + // Update file length. + GridGgfsFileInfo updated = new GridGgfsFileInfo(oldInfo, oldInfo.length() + space); + + updated.fileMap(newMap); + + return updated; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(space); + out.writeObject(range); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + space = in.readLong(); + range = (GridGgfsFileAffinityRange)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReserveSpaceClosure.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsTaskArgsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsTaskArgsImpl.java new file mode 100644 index 0000000..25c1542 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsTaskArgsImpl.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * GGFS task arguments implementation. + */ +public class IgniteFsTaskArgsImpl<T> implements IgniteFsTaskArgs<T>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** Paths. */ + private Collection<IgniteFsPath> paths; + + /** Record resolver. */ + private IgniteFsRecordResolver recRslvr; + + /** Skip non existent files flag. */ + private boolean skipNonExistentFiles; + + /** Maximum range length. */ + private long maxRangeLen; + + /** User argument. */ + private T usrArg; + + /** + * {@link Externalizable} support. + */ + public IgniteFsTaskArgsImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param ggfsName GGFS name. + * @param paths Paths. + * @param recRslvr Record resolver. + * @param skipNonExistentFiles Skip non existent files flag. + * @param maxRangeLen Maximum range length. + * @param usrArg User argument. + */ + public IgniteFsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr, + boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { + this.ggfsName = ggfsName; + this.paths = paths; + this.recRslvr = recRslvr; + this.skipNonExistentFiles = skipNonExistentFiles; + this.maxRangeLen = maxRangeLen; + this.usrArg = usrArg; + } + + /** {@inheritDoc} */ + @Override public String ggfsName() { + return ggfsName; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFsPath> paths() { + return paths; + } + + /** {@inheritDoc} */ + @Override public IgniteFsRecordResolver recordResolver() { + return recRslvr; + } + + /** {@inheritDoc} */ + @Override public boolean skipNonExistentFiles() { + return skipNonExistentFiles; + } + + /** {@inheritDoc} */ + @Override public long maxRangeLength() { + return maxRangeLen; + } + + /** {@inheritDoc} */ + @Override public T userArgument() { + return usrArg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsTaskArgsImpl.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ggfsName); + U.writeCollection(out, paths); + + out.writeObject(recRslvr); + out.writeBoolean(skipNonExistentFiles); + out.writeLong(maxRangeLen); + out.writeObject(usrArg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ggfsName = U.readString(in); + paths = U.readCollection(in); + + recRslvr = (IgniteFsRecordResolver)in.readObject(); + skipNonExistentFiles = in.readBoolean(); + maxRangeLen = in.readLong(); + usrArg = (T)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html new file mode 100644 index 0000000..95b8af7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains high performance file system processer. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorGgfsSamplingStateTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorGgfsSamplingStateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorGgfsSamplingStateTask.java index 2c80dfb..6df9bb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorGgfsSamplingStateTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorGgfsSamplingStateTask.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.visor.ggfs; import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.visor.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index 67f0d6a..7cc5cfa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -23,7 +23,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.streamer.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.visor.*; import org.apache.ignite.internal.visor.cache.*; import org.apache.ignite.internal.visor.compute.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java index 36b6d5b..9bb838c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java @@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.random.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.visor.event.*; import org.apache.ignite.internal.visor.file.*; import org.apache.ignite.internal.visor.log.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAckMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAckMessage.java deleted file mode 100644 index 82340ac..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAckMessage.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.internal.util.direct.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; - -/** - * Block write request acknowledgement message. - */ -public class GridGgfsAckMessage extends GridGgfsCommunicationMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** File id. */ - private IgniteUuid fileId; - - /** Request ID to ack. */ - private long id; - - /** Write exception. */ - @GridDirectTransient - private IgniteCheckedException err; - - /** */ - private byte[] errBytes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridGgfsAckMessage() { - // No-op. - } - - /** - * @param fileId File ID. - * @param id Request ID. - * @param err Error. - */ - public GridGgfsAckMessage(IgniteUuid fileId, long id, @Nullable IgniteCheckedException err) { - this.fileId = fileId; - this.id = id; - this.err = err; - } - - /** - * @return File ID. - */ - public IgniteUuid fileId() { - return fileId; - } - - /** - * @return Batch ID. - */ - public long id() { - return id; - } - - /** - * @return Error occurred when writing this batch, if any. - */ - public IgniteCheckedException error() { - return err; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { - super.prepareMarshal(marsh); - - if (err != null) - errBytes = marsh.marshal(err); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(marsh, ldr); - - if (errBytes != null) - err = marsh.unmarshal(errBytes, ldr); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridGgfsAckMessage _clone = new GridGgfsAckMessage(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridGgfsAckMessage _clone = (GridGgfsAckMessage)_msg; - - _clone.fileId = fileId; - _clone.id = id; - _clone.err = err; - _clone.errBytes = errBytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putByteArray(errBytes)) - return false; - - commState.idx++; - - case 1: - if (!commState.putGridUuid(fileId)) - return false; - - commState.idx++; - - case 2: - if (!commState.putLong(id)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 0: - byte[] errBytes0 = commState.getByteArray(); - - if (errBytes0 == BYTE_ARR_NOT_READ) - return false; - - errBytes = errBytes0; - - commState.idx++; - - case 1: - IgniteUuid fileId0 = commState.getGridUuid(); - - if (fileId0 == GRID_UUID_NOT_READ) - return false; - - fileId = fileId0; - - commState.idx++; - - case 2: - if (buf.remaining() < 8) - return false; - - id = commState.getLong(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 65; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java deleted file mode 100644 index 6fb7583..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.*; -import org.jetbrains.annotations.*; - -import java.net.*; -import java.util.*; - -/** - * Ggfs supporting asynchronous operations. - */ -public class GridGgfsAsyncImpl extends IgniteAsyncSupportAdapter implements GridGgfsEx { - /** */ - private final GridGgfsImpl ggfs; - - /** - * @param ggfs Ggfs. - */ - public GridGgfsAsyncImpl(GridGgfsImpl ggfs) { - super(true); - - this.ggfs = ggfs; - } - - /** {@inheritDoc} */ - @Override public IgniteFs enableAsync() { - return this; - } - - /** {@inheritDoc} */ - @Override public void format() throws IgniteCheckedException { - saveOrGet(ggfs.formatAsync()); - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, - Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteCheckedException { - return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg)); - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, - Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) - throws IgniteCheckedException { - return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, - @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteCheckedException { - return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg)); - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, - @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, - long maxRangeLen, @Nullable T arg) throws IgniteCheckedException { - return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); - } - - /** {@inheritDoc} */ - @Override public void stop() { - ggfs.stop(); - } - - /** {@inheritDoc} */ - @Override public GridGgfsContext context() { - return ggfs.context(); - } - - /** {@inheritDoc} */ - @Override public GridGgfsPaths proxyPaths() { - return ggfs.proxyPaths(); - } - - /** {@inheritDoc} */ - @Override public GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize, - int seqReadsBeforePrefetch) throws IgniteCheckedException { - return ggfs.open(path, bufSize, seqReadsBeforePrefetch); - } - - /** {@inheritDoc} */ - @Override public GridGgfsInputStreamAdapter open(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.open(path); - } - - /** {@inheritDoc} */ - @Override public GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize) throws IgniteCheckedException { - return ggfs.open(path, bufSize); - } - - /** {@inheritDoc} */ - @Override public GridGgfsStatus globalSpace() throws IgniteCheckedException { - return ggfs.globalSpace(); - } - - /** {@inheritDoc} */ - @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { - ggfs.globalSampling(val); - } - - /** {@inheritDoc} */ - @Nullable @Override public Boolean globalSampling() { - return ggfs.globalSampling(); - } - - /** {@inheritDoc} */ - @Override public GridGgfsLocalMetrics localMetrics() { - return ggfs.localMetrics(); - } - - /** {@inheritDoc} */ - @Override public long groupBlockSize() { - return ggfs.groupBlockSize(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> awaitDeletesAsync() throws IgniteCheckedException { - return ggfs.awaitDeletesAsync(); - } - - /** {@inheritDoc} */ - @Nullable @Override public String clientLogDirectory() { - return ggfs.clientLogDirectory(); - } - - /** {@inheritDoc} */ - @Override public void clientLogDirectory(String logDir) { - ggfs.clientLogDirectory(logDir); - } - - /** {@inheritDoc} */ - @Override public boolean evictExclude(IgniteFsPath path, boolean primary) { - return ggfs.evictExclude(path, primary); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid nextAffinityKey() { - return ggfs.nextAffinityKey(); - } - - /** {@inheritDoc} */ - @Override public boolean isProxy(URI path) { - return ggfs.isProxy(path); - } - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return ggfs.name(); - } - - /** {@inheritDoc} */ - @Override public IgniteFsConfiguration configuration() { - return ggfs.configuration(); - } - - /** {@inheritDoc} */ - @Override public IgniteFsPathSummary summary(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.summary(path); - } - - /** {@inheritDoc} */ - @Override public IgniteFsOutputStream create(IgniteFsPath path, boolean overwrite) throws IgniteCheckedException { - return ggfs.create(path, overwrite); - } - - /** {@inheritDoc} */ - @Override public IgniteFsOutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { - return ggfs.create(path, bufSize, overwrite, replication, blockSize, props); - } - - /** {@inheritDoc} */ - @Override public IgniteFsOutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, - @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) - throws IgniteCheckedException { - return ggfs.create(path, bufSize, overwrite, affKey, replication, blockSize, props); - } - - /** {@inheritDoc} */ - @Override public IgniteFsOutputStream append(IgniteFsPath path, boolean create) throws IgniteCheckedException { - return ggfs.append(path, create); - } - - /** {@inheritDoc} */ - @Override public IgniteFsOutputStream append(IgniteFsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { - return ggfs.append(path, bufSize, create, props); - } - - /** {@inheritDoc} */ - @Override public void setTimes(IgniteFsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - ggfs.setTimes(path, accessTime, modificationTime); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len) - throws IgniteCheckedException { - return ggfs.affinity(path, start, len); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len, long maxLen) - throws IgniteCheckedException { - return ggfs.affinity(path, start, len, maxLen); - } - - /** {@inheritDoc} */ - @Override public IgniteFsMetrics metrics() throws IgniteCheckedException { - return ggfs.metrics(); - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() throws IgniteCheckedException { - ggfs.resetMetrics(); - } - - /** {@inheritDoc} */ - @Override public long size(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.size(path); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.exists(path); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) throws IgniteCheckedException { - return ggfs.update(path, props); - } - - /** {@inheritDoc} */ - @Override public void rename(IgniteFsPath src, IgniteFsPath dest) throws IgniteCheckedException { - ggfs.rename(src, dest); - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgniteFsPath path, boolean recursive) throws IgniteCheckedException { - return ggfs.delete(path, recursive); - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgniteFsPath path) throws IgniteCheckedException { - ggfs.mkdirs(path); - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgniteFsPath path, @Nullable Map<String, String> props) throws IgniteCheckedException { - ggfs.mkdirs(path, props); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFsPath> listPaths(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.listPaths(path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFsFile> listFiles(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.listFiles(path); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteFsFile info(IgniteFsPath path) throws IgniteCheckedException { - return ggfs.info(path); - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() throws IgniteCheckedException { - return ggfs.usedSpaceSize(); - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return ggfs.properties(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributes.java deleted file mode 100644 index 0a16144..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributes.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * GGFS attributes. - * <p> - * This class contains information on a single GGFS configured on some node. - */ -public class GridGgfsAttributes implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS name. */ - private String ggfsName; - - /** File's data block size (bytes). */ - private int blockSize; - - /** Size of the group figured in {@link org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper}. */ - private int grpSize; - - /** Meta cache name. */ - private String metaCacheName; - - /** Data cache name. */ - private String dataCacheName; - - /** Default mode. */ - private IgniteFsMode dfltMode; - - /** Fragmentizer enabled flag. */ - private boolean fragmentizerEnabled; - - /** Path modes. */ - private Map<String, IgniteFsMode> pathModes; - - /** - * @param ggfsName GGFS name. - * @param blockSize File's data block size (bytes). - * @param grpSize Size of the group figured in {@link org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper}. - * @param metaCacheName Meta cache name. - * @param dataCacheName Data cache name. - * @param dfltMode Default mode. - * @param pathModes Path modes. - */ - public GridGgfsAttributes(String ggfsName, int blockSize, int grpSize, String metaCacheName, String dataCacheName, - IgniteFsMode dfltMode, Map<String, IgniteFsMode> pathModes, boolean fragmentizerEnabled) { - this.blockSize = blockSize; - this.ggfsName = ggfsName; - this.grpSize = grpSize; - this.metaCacheName = metaCacheName; - this.dataCacheName = dataCacheName; - this.dfltMode = dfltMode; - this.pathModes = pathModes; - this.fragmentizerEnabled = fragmentizerEnabled; - } - - /** - * Public no-arg constructor for {@link Externalizable}. - */ - public GridGgfsAttributes() { - // No-op. - } - - /** - * @return GGFS name. - */ - public String ggfsName() { - return ggfsName; - } - - /** - * @return File's data block size (bytes). - */ - public int blockSize() { - return blockSize; - } - - /** - * @return Size of the group figured in {@link org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper}. - */ - public int groupSize() { - return grpSize; - } - - /** - * @return Metadata cache name. - */ - public String metaCacheName() { - return metaCacheName; - } - - /** - * @return Data cache name. - */ - public String dataCacheName() { - return dataCacheName; - } - - /** - * @return Default mode. - */ - public IgniteFsMode defaultMode() { - return dfltMode; - } - - /** - * @return Path modes. - */ - public Map<String, IgniteFsMode> pathModes() { - return pathModes != null ? Collections.unmodifiableMap(pathModes) : null; - } - - /** - * @return {@code True} if fragmentizer is enabled. - */ - public boolean fragmentizerEnabled() { - return fragmentizerEnabled; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, ggfsName); - out.writeInt(blockSize); - out.writeInt(grpSize); - U.writeString(out, metaCacheName); - U.writeString(out, dataCacheName); - U.writeEnum0(out, dfltMode); - out.writeBoolean(fragmentizerEnabled); - - if (pathModes != null) { - out.writeBoolean(true); - - out.writeInt(pathModes.size()); - - for (Map.Entry<String, IgniteFsMode> pathMode : pathModes.entrySet()) { - U.writeString(out, pathMode.getKey()); - U.writeEnum0(out, pathMode.getValue()); - } - } - else - out.writeBoolean(false); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ggfsName = U.readString(in); - blockSize = in.readInt(); - grpSize = in.readInt(); - metaCacheName = U.readString(in); - dataCacheName = U.readString(in); - dfltMode = IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in)); - fragmentizerEnabled = in.readBoolean(); - - if (in.readBoolean()) { - int size = in.readInt(); - - pathModes = new HashMap<>(size, 1.0f); - - for (int i = 0; i < size; i++) - pathModes.put(U.readString(in), IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in))); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsBlockKey.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsBlockKey.java deleted file mode 100644 index 5a35c9b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsBlockKey.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; - -/** - * File's binary data block key. - */ -@GridInternal -public final class GridGgfsBlockKey extends GridTcpCommunicationMessageAdapter - implements Externalizable, Comparable<GridGgfsBlockKey> { - /** */ - private static final long serialVersionUID = 0L; - - /** File system file ID. */ - private IgniteUuid fileId; - - /** Block ID. */ - private long blockId; - - /** Block affinity key. */ - private IgniteUuid affKey; - - /** Eviction exclude flag. */ - private boolean evictExclude; - - /** - * Constructs file's binary data block key. - * - * @param fileId File ID. - * @param affKey Affinity key. - * @param evictExclude Evict exclude flag. - * @param blockId Block ID. - */ - public GridGgfsBlockKey(IgniteUuid fileId, @Nullable IgniteUuid affKey, boolean evictExclude, long blockId) { - assert fileId != null; - assert blockId >= 0; - - this.fileId = fileId; - this.affKey = affKey; - this.evictExclude = evictExclude; - this.blockId = blockId; - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridGgfsBlockKey() { - // No-op. - } - - /** - * @return File ID. - */ - public IgniteUuid getFileId() { - return fileId; - } - - /** - * @return Block affinity key. - */ - public IgniteUuid affinityKey() { - return affKey; - } - - /** - * @return Evict exclude flag. - */ - public boolean evictExclude() { - return evictExclude; - } - - /** - * @return Block ID. - */ - public long getBlockId() { - return blockId; - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull GridGgfsBlockKey o) { - int res = fileId.compareTo(o.fileId); - - if (res != 0) - return res; - - long v1 = blockId; - long v2 = o.blockId; - - if (v1 != v2) - return v1 > v2 ? 1 : -1; - - if (affKey == null && o.affKey == null) - return 0; - - if (affKey != null && o.affKey != null) - return affKey.compareTo(o.affKey); - - return affKey != null ? -1 : 1; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, fileId); - U.writeGridUuid(out, affKey); - out.writeBoolean(evictExclude); - out.writeLong(blockId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException { - fileId = U.readGridUuid(in); - affKey = U.readGridUuid(in); - evictExclude = in.readBoolean(); - blockId = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return fileId.hashCode() + (int)(blockId ^ (blockId >>> 32)); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o == null || o.getClass() != getClass()) - return false; - - GridGgfsBlockKey that = (GridGgfsBlockKey)o; - - return blockId == that.blockId && fileId.equals(that.fileId) && F.eq(affKey, that.affKey) && - evictExclude == that.evictExclude; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridGgfsBlockKey _clone = new GridGgfsBlockKey(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - GridGgfsBlockKey _clone = (GridGgfsBlockKey)_msg; - - _clone.fileId = fileId; - _clone.blockId = blockId; - _clone.affKey = affKey; - _clone.evictExclude = evictExclude; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putGridUuid(affKey)) - return false; - - commState.idx++; - - case 1: - if (!commState.putLong(blockId)) - return false; - - commState.idx++; - - case 2: - if (!commState.putBoolean(evictExclude)) - return false; - - commState.idx++; - - case 3: - if (!commState.putGridUuid(fileId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - switch (commState.idx) { - case 0: - IgniteUuid affKey0 = commState.getGridUuid(); - - if (affKey0 == GRID_UUID_NOT_READ) - return false; - - affKey = affKey0; - - commState.idx++; - - case 1: - if (buf.remaining() < 8) - return false; - - blockId = commState.getLong(); - - commState.idx++; - - case 2: - if (buf.remaining() < 1) - return false; - - evictExclude = commState.getBoolean(); - - commState.idx++; - - case 3: - IgniteUuid fileId0 = commState.getGridUuid(); - - if (fileId0 == GRID_UUID_NOT_READ) - return false; - - fileId = fileId0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 66; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsBlockKey.class, this); - } -}
