http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsFileImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsFileImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsFileImpl.java deleted file mode 100644 index d69c8be..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsFileImpl.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * File or directory information. - */ -public final class IgniteFsFileImpl implements IgniteFsFile, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Path to this file. */ - private IgniteFsPath path; - - /** File id. */ - private IgniteUuid fileId; - - /** Block size. */ - private int blockSize; - - /** Group block size. */ - private long grpBlockSize; - - /** File length. */ - private long len; - - /** Last access time. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** Properties. */ - private Map<String, String> props; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgniteFsFileImpl() { - // No-op. - } - - /** - * Constructs directory info. - * - * @param path Path. - */ - public IgniteFsFileImpl(IgniteFsPath path, GridGgfsFileInfo info, long globalGrpBlockSize) { - A.notNull(path, "path"); - A.notNull(info, "info"); - - this.path = path; - fileId = info.id(); - - if (info.isFile()) { - blockSize = info.blockSize(); - len = info.length(); - - grpBlockSize = info.affinityKey() == null ? globalGrpBlockSize : - info.length() == 0 ? globalGrpBlockSize : info.length(); - } - - props = info.properties(); - - if (props == null) - props = Collections.emptyMap(); - - accessTime = info.accessTime(); - modificationTime = info.modificationTime(); - } - - /** - * Constructs file instance. - * - * @param path Path. - * @param entry Listing entry. - */ - public IgniteFsFileImpl(IgniteFsPath path, GridGgfsListingEntry entry, long globalGrpSize) { - A.notNull(path, "path"); - A.notNull(entry, "entry"); - - this.path = path; - fileId = entry.fileId(); - - blockSize = entry.blockSize(); - - grpBlockSize = entry.affinityKey() == null ? globalGrpSize : - entry.length() == 0 ? globalGrpSize : entry.length(); - - len = entry.length(); - props = entry.properties(); - - accessTime = entry.accessTime(); - modificationTime = entry.modificationTime(); - } - - /** {@inheritDoc} */ - @Override public IgniteFsPath path() { - return path; - } - - /** - * @return File ID. - */ - public IgniteUuid fileId() { - return fileId; - } - - /** {@inheritDoc} */ - @Override public boolean isFile() { - return blockSize > 0; - } - - /** {@inheritDoc} */ - @Override public boolean isDirectory() { - return blockSize == 0; - } - - /** {@inheritDoc} */ - @Override public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public int blockSize() { - return blockSize; - } - - /** {@inheritDoc} */ - @Override public long groupBlockSize() { - return grpBlockSize; - } - - /** {@inheritDoc} */ - @Override public long accessTime() { - return accessTime; - } - - /** {@inheritDoc} */ - @Override public long modificationTime() { - return modificationTime; - } - - /** {@inheritDoc} */ - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - /** {@inheritDoc} */ - @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return props; - } - - /** - * Writes object to data output. - * - * @param out Data output. - */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - path.writeExternal(out); - - out.writeInt(blockSize); - out.writeLong(grpBlockSize); - out.writeLong(len); - U.writeStringMap(out, props); - out.writeLong(accessTime); - out.writeLong(modificationTime); - } - - /** - * Reads object from data input. - * - * @param in Data input. - */ - @Override public void readExternal(ObjectInput in) throws IOException { - path = new IgniteFsPath(); - - path.readExternal(in); - - blockSize = in.readInt(); - grpBlockSize = in.readLong(); - len = in.readLong(); - props = U.readStringMap(in); - accessTime = in.readLong(); - modificationTime = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return path.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - IgniteFsFileImpl that = (IgniteFsFileImpl)o; - - return path.equals(that.path); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsFileImpl.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsMetricsAdapter.java deleted file mode 100644 index e28a972..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsMetricsAdapter.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * GGFS metrics adapter. - */ -public class IgniteFsMetricsAdapter implements IgniteFsMetrics, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Used space on local node. */ - private long locSpaceSize; - - /** Maximum space. */ - private long maxSpaceSize; - - /** Secondary file system used space. */ - private long secondarySpaceSize; - - /** Number of directories. */ - private int dirsCnt; - - /** Number of files. */ - private int filesCnt; - - /** Number of files opened for read. */ - private int filesOpenedForRead; - - /** Number of files opened for write. */ - private int filesOpenedForWrite; - - /** Total blocks read. */ - private long blocksReadTotal; - - /** Total blocks remote read. */ - private long blocksReadRmt; - - /** Total blocks write. */ - private long blocksWrittenTotal; - - /** Total blocks write remote. */ - private long blocksWrittenRmt; - - /** Total bytes read. */ - private long bytesRead; - - /** Total bytes read time. */ - private long bytesReadTime; - - /** Total bytes write. */ - private long bytesWritten; - - /** Total bytes write time. */ - private long bytesWriteTime; - - /** - * {@link Externalizable} support. - */ - public IgniteFsMetricsAdapter() { - // No-op. - } - - /** - * @param locSpaceSize Used space on local node. - * @param maxSpaceSize Maximum space size. - * @param secondarySpaceSize Secondary space size. - * @param dirsCnt Number of directories. - * @param filesCnt Number of files. - * @param filesOpenedForRead Number of files opened for read. - * @param filesOpenedForWrite Number of files opened for write. - * @param blocksReadTotal Total blocks read. - * @param blocksReadRmt Total blocks read remotely. - * @param blocksWrittenTotal Total blocks written. - * @param blocksWrittenRmt Total blocks written remotely. - * @param bytesRead Total bytes read. - * @param bytesReadTime Total bytes read time. - * @param bytesWritten Total bytes written. - * @param bytesWriteTime Total bytes write time. - */ - public IgniteFsMetricsAdapter(long locSpaceSize, long maxSpaceSize, long secondarySpaceSize, int dirsCnt, - int filesCnt, int filesOpenedForRead, int filesOpenedForWrite, long blocksReadTotal, long blocksReadRmt, - long blocksWrittenTotal, long blocksWrittenRmt, long bytesRead, long bytesReadTime, long bytesWritten, - long bytesWriteTime) { - this.locSpaceSize = locSpaceSize; - this.maxSpaceSize = maxSpaceSize; - this.secondarySpaceSize = secondarySpaceSize; - this.dirsCnt = dirsCnt; - this.filesCnt = filesCnt; - this.filesOpenedForRead = filesOpenedForRead; - this.filesOpenedForWrite = filesOpenedForWrite; - this.blocksReadTotal = blocksReadTotal; - this.blocksReadRmt = blocksReadRmt; - this.blocksWrittenTotal = blocksWrittenTotal; - this.blocksWrittenRmt = blocksWrittenRmt; - this.bytesRead = bytesRead; - this.bytesReadTime = bytesReadTime; - this.bytesWritten = bytesWritten; - this.bytesWriteTime = bytesWriteTime; - } - - /** {@inheritDoc} */ - @Override public long localSpaceSize() { - return locSpaceSize; - } - - /** {@inheritDoc} */ - @Override public long maxSpaceSize() { - return maxSpaceSize; - } - - /** {@inheritDoc} */ - @Override public long secondarySpaceSize() { - return secondarySpaceSize; - } - - /** {@inheritDoc} */ - @Override public int directoriesCount() { - return dirsCnt; - } - - /** {@inheritDoc} */ - @Override public int filesCount() { - return filesCnt; - } - - /** {@inheritDoc} */ - @Override public int filesOpenedForRead() { - return filesOpenedForRead; - } - - /** {@inheritDoc} */ - @Override public int filesOpenedForWrite() { - return filesOpenedForWrite; - } - - /** {@inheritDoc} */ - @Override public long blocksReadTotal() { - return blocksReadTotal; - } - - /** {@inheritDoc} */ - @Override public long blocksReadRemote() { - return blocksReadRmt; - } - - /** {@inheritDoc} */ - @Override public long blocksWrittenTotal() { - return blocksWrittenTotal; - } - - /** {@inheritDoc} */ - @Override public long blocksWrittenRemote() { - return blocksWrittenRmt; - } - - /** {@inheritDoc} */ - @Override public long bytesRead() { - return bytesRead; - } - - /** {@inheritDoc} */ - @Override public long bytesReadTime() { - return bytesReadTime; - } - - /** {@inheritDoc} */ - @Override public long bytesWritten() { - return bytesWritten; - } - - /** {@inheritDoc} */ - @Override public long bytesWriteTime() { - return bytesWriteTime; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(locSpaceSize); - out.writeLong(maxSpaceSize); - out.writeLong(secondarySpaceSize); - out.writeInt(dirsCnt); - out.writeInt(filesCnt); - out.writeInt(filesOpenedForRead); - out.writeInt(filesOpenedForWrite); - out.writeLong(blocksReadTotal); - out.writeLong(blocksReadRmt); - out.writeLong(blocksWrittenTotal); - out.writeLong(blocksWrittenRmt); - out.writeLong(bytesRead); - out.writeLong(bytesReadTime); - out.writeLong(bytesWritten); - out.writeLong(bytesWriteTime); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException { - locSpaceSize = in.readLong(); - maxSpaceSize = in.readLong(); - secondarySpaceSize = in.readLong(); - dirsCnt = in.readInt(); - filesCnt = in.readInt(); - filesOpenedForRead = in.readInt(); - filesOpenedForWrite = in.readInt(); - blocksReadTotal = in.readLong(); - blocksReadRmt = in.readLong(); - blocksWrittenTotal = in.readLong(); - blocksWrittenRmt = in.readLong(); - bytesRead = in.readLong(); - bytesReadTime = in.readLong(); - bytesWritten = in.readLong(); - bytesWriteTime = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsMetricsAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java deleted file mode 100644 index cffcadd..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; - -/** - * Output stream to store data into grid cache with separate blocks. - */ -@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") -abstract class IgniteFsOutputStreamAdapter extends IgniteFsOutputStream { - /** Path to file. */ - protected final IgniteFsPath path; - - /** Buffer size. */ - private final int bufSize; - - /** Flag for this stream open/closed state. */ - private boolean closed; - - /** Local buffer to store stream data as consistent block. */ - private ByteBuffer buf; - - /** Bytes written. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - protected long bytes; - - /** Time consumed by write operations. */ - protected long time; - - /** - * Constructs file output stream. - * - * @param path Path to stored file. - * @param bufSize The size of the buffer to be used. - */ - IgniteFsOutputStreamAdapter(IgniteFsPath path, int bufSize) { - assert path != null; - assert bufSize > 0; - - this.path = path; - this.bufSize = bufSize; - } - - /** - * Gets number of written bytes. - * - * @return Written bytes. - */ - public long bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(int b) throws IOException { - checkClosed(null, 0); - - long startTime = System.nanoTime(); - - b &= 0xFF; - - if (buf == null) - buf = ByteBuffer.allocate(bufSize); - - buf.put((byte)b); - - if (buf.position() >= bufSize) - sendData(true); // Send data to server. - - time += System.nanoTime() - startTime; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(byte[] b, int off, int len) throws IOException { - A.notNull(b, "b"); - - if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off + - ", length=" + len + ']'); - } - - checkClosed(null, 0); - - if (len == 0) - return; // Done. - - long startTime = System.nanoTime(); - - if (buf == null) { - // Do not allocate and copy byte buffer if will send data immediately. - if (len >= bufSize) { - buf = ByteBuffer.wrap(b, off, len); - - sendData(false); - - return; - } - - buf = ByteBuffer.allocate(Math.max(bufSize, len)); - } - - if (buf.remaining() < len) - // Expand buffer capacity, if remaining size is less then data size. - buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip()); - - assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " + - "[len=" + len + ", buf.remaining=" + buf.remaining() + ']'; - - buf.put(b, off, len); - - if (buf.position() >= bufSize) - sendData(true); // Send data to server. - - time += System.nanoTime() - startTime; - } - - /** {@inheritDoc} */ - @Override public synchronized void transferFrom(DataInput in, int len) throws IOException { - checkClosed(in, len); - - long startTime = System.nanoTime(); - - // Send all IPC data from the local buffer before streaming. - if (buf != null && buf.position() > 0) - sendData(true); - - try { - storeDataBlocks(in, len); - } - catch (IgniteCheckedException e) { - throw new IOException(e.getMessage(), e); - } - - time += System.nanoTime() - startTime; - } - - /** - * Flushes this output stream and forces any buffered output bytes to be written out. - * - * @exception IOException if an I/O error occurs. - */ - @Override public synchronized void flush() throws IOException { - checkClosed(null, 0); - - // Send all IPC data from the local buffer. - if (buf != null && buf.position() > 0) - sendData(true); - } - - /** {@inheritDoc} */ - @Override public final synchronized void close() throws IOException { - // Do nothing if stream is already closed. - if (closed) - return; - - try { - // Send all IPC data from the local buffer. - try { - flush(); - } - finally { - onClose(); // "onClose()" routine must be invoked anyway! - } - } - finally { - // Mark this stream closed AFTER flush. - closed = true; - } - } - - /** - * Store data blocks in file.<br/> - * Note! If file concurrently deleted we'll get lost blocks. - * - * @param data Data to store. - * @throws IgniteCheckedException If failed. - */ - protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException; - - /** - * Store data blocks in file reading appropriate number of bytes from given data input. - * - * @param in Data input to read from. - * @param len Data length to store. - * @throws IgniteCheckedException If failed. - */ - protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException; - - /** - * Close callback. It will be called only once in synchronized section. - * - * @throws IOException If failed. - */ - protected void onClose() throws IOException { - // No-op. - } - - /** - * Validate this stream is open. - * - * @throws IOException If this stream is closed. - */ - private void checkClosed(@Nullable DataInput in, int len) throws IOException { - assert Thread.holdsLock(this); - - if (closed) { - // Must read data from stream before throwing exception. - if (in != null) - in.skipBytes(len); - - throw new IOException("Stream has been closed: " + this); - } - } - - /** - * Send all local-buffered data to server. - * - * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped - * byte array. - * @throws IOException In case of IO exception. - */ - private void sendData(boolean flip) throws IOException { - assert Thread.holdsLock(this); - - try { - if (flip) - buf.flip(); - - storeDataBlock(buf); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to store data into file: " + path, e); - } - - buf = null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsOutputStreamAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java deleted file mode 100644 index c268ef1..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java +++ /dev/null @@ -1,504 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.fs.IgniteFsMode.*; - -/** - * Output stream to store data into grid cache with separate blocks. - */ -class IgniteFsOutputStreamImpl extends IgniteFsOutputStreamAdapter { - /** Maximum number of blocks in buffer. */ - private static final int MAX_BLOCKS_CNT = 16; - - /** GGFS context. */ - private GridGgfsContext ggfsCtx; - - /** Meta info manager. */ - private final GridGgfsMetaManager meta; - - /** Data manager. */ - private final GridGgfsDataManager data; - - /** File descriptor. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private GridGgfsFileInfo fileInfo; - - /** Parent ID. */ - private final IgniteUuid parentId; - - /** File name. */ - private final String fileName; - - /** Space in file to write data. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private long space; - - /** Intermediate remainder to keep data. */ - private byte[] remainder; - - /** Data length in remainder. */ - private int remainderDataLen; - - /** Write completion future. */ - private final IgniteFuture<Boolean> writeCompletionFut; - - /** GGFS mode. */ - private final IgniteFsMode mode; - - /** File worker batch. */ - private final GridGgfsFileWorkerBatch batch; - - /** Ensures that onClose)_ routine is called no more than once. */ - private final AtomicBoolean onCloseGuard = new AtomicBoolean(); - - /** Local GGFS metrics. */ - private final GridGgfsLocalMetrics metrics; - - /** Affinity written by this output stream. */ - private GridGgfsFileAffinityRange streamRange; - - /** - * Constructs file output stream. - * - * @param ggfsCtx GGFS context. - * @param path Path to stored file. - * @param fileInfo File info to write binary data to. - * @param bufSize The size of the buffer to be used. - * @param mode Grid GGFS mode. - * @param batch Optional secondary file system batch. - * @param metrics Local GGFs metrics. - * @throws IgniteCheckedException If stream creation failed. - */ - IgniteFsOutputStreamImpl(GridGgfsContext ggfsCtx, IgniteFsPath path, GridGgfsFileInfo fileInfo, IgniteUuid parentId, - int bufSize, IgniteFsMode mode, @Nullable GridGgfsFileWorkerBatch batch, GridGgfsLocalMetrics metrics) - throws IgniteCheckedException { - super(path, optimizeBufferSize(bufSize, fileInfo)); - - assert fileInfo != null; - assert fileInfo.isFile() : "Unexpected file info: " + fileInfo; - assert mode != null && mode != PROXY; - assert mode == PRIMARY && batch == null || batch != null; - assert metrics != null; - - // File hasn't been locked. - if (fileInfo.lockId() == null) - throw new IgniteFsException("Failed to acquire file lock (concurrently modified?): " + path); - - this.ggfsCtx = ggfsCtx; - meta = ggfsCtx.meta(); - data = ggfsCtx.data(); - - this.fileInfo = fileInfo; - this.mode = mode; - this.batch = batch; - this.parentId = parentId; - this.metrics = metrics; - - streamRange = initialStreamRange(fileInfo); - - fileName = path.name(); - - writeCompletionFut = data.writeStart(fileInfo); - } - - /** - * Optimize buffer size. - * - * @param bufSize Requested buffer size. - * @param fileInfo File info. - * @return Optimized buffer size. - */ - @SuppressWarnings("IfMayBeConditional") - private static int optimizeBufferSize(int bufSize, GridGgfsFileInfo fileInfo) { - assert bufSize > 0; - - if (fileInfo == null) - return bufSize; - - int blockSize = fileInfo.blockSize(); - - if (blockSize <= 0) - return bufSize; - - if (bufSize <= blockSize) - // Optimize minimum buffer size to be equal file's block size. - return blockSize; - - int maxBufSize = blockSize * MAX_BLOCKS_CNT; - - if (bufSize > maxBufSize) - // There is no profit or optimization from larger buffers. - return maxBufSize; - - if (fileInfo.length() == 0) - // Make buffer size multiple of block size (optimized for new files). - return bufSize / blockSize * blockSize; - - return bufSize; - } - - /** {@inheritDoc} */ - @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException { - int writeLen = block.remaining(); - - preStoreDataBlocks(null, writeLen); - - int blockSize = fileInfo.blockSize(); - - // If data length is not enough to fill full block, fill the remainder and return. - if (remainderDataLen + writeLen < blockSize) { - if (remainder == null) - remainder = new byte[blockSize]; - else if (remainder.length != blockSize) { - assert remainderDataLen == remainder.length; - - byte[] allocated = new byte[blockSize]; - - U.arrayCopy(remainder, 0, allocated, 0, remainder.length); - - remainder = allocated; - } - - block.get(remainder, remainderDataLen, writeLen); - - remainderDataLen += writeLen; - } - else { - remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block, - false, streamRange, batch); - - remainderDataLen = remainder == null ? 0 : remainder.length; - } - } - - /** {@inheritDoc} */ - @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException { - preStoreDataBlocks(in, len); - - int blockSize = fileInfo.blockSize(); - - // If data length is not enough to fill full block, fill the remainder and return. - if (remainderDataLen + len < blockSize) { - if (remainder == null) - remainder = new byte[blockSize]; - else if (remainder.length != blockSize) { - assert remainderDataLen == remainder.length; - - byte[] allocated = new byte[blockSize]; - - U.arrayCopy(remainder, 0, allocated, 0, remainder.length); - - remainder = allocated; - } - - in.readFully(remainder, remainderDataLen, len); - - remainderDataLen += len; - } - else { - remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len, - false, streamRange, batch); - - remainderDataLen = remainder == null ? 0 : remainder.length; - } - } - - /** - * Initializes data loader if it was not initialized yet and updates written space. - * - * @param len Data length to be written. - */ - private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException { - // Check if any exception happened while writing data. - if (writeCompletionFut.isDone()) { - assert ((GridFutureAdapter)writeCompletionFut).isFailed(); - - if (in != null) - in.skipBytes(len); - - writeCompletionFut.get(); - } - - bytes += len; - space += len; - } - - /** - * Flushes this output stream and forces any buffered output bytes to be written out. - * - * @exception IOException if an I/O error occurs. - */ - @Override public synchronized void flush() throws IOException { - boolean exists; - - try { - exists = meta.exists(fileInfo.id()); - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - - if (!exists) { - onClose(true); - - throw new IOException("File was concurrently deleted: " + path); - } - - super.flush(); - - try { - if (remainder != null) { - data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0, - ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); - - remainder = null; - remainderDataLen = 0; - } - - if (space > 0) { - GridGgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(), - new ReserveSpaceClosure(space, streamRange)); - - if (fileInfo0 == null) - throw new IOException("File was concurrently deleted: " + path); - else - fileInfo = fileInfo0; - - streamRange = initialStreamRange(fileInfo); - - space = 0; - } - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e); - } - } - - /** {@inheritDoc} */ - @Override protected void onClose() throws IOException { - onClose(false); - } - - /** - * Close callback. It will be called only once in synchronized section. - * - * @param deleted Whether we already know that the file was deleted. - * @throws IOException If failed. - */ - private void onClose(boolean deleted) throws IOException { - assert Thread.holdsLock(this); - - if (onCloseGuard.compareAndSet(false, true)) { - // Notify backing secondary file system batch to finish. - if (mode != PRIMARY) { - assert batch != null; - - batch.finish(); - } - - // Ensure file existence. - boolean exists; - - try { - exists = !deleted && meta.exists(fileInfo.id()); - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - - if (exists) { - IOException err = null; - - try { - data.writeClose(fileInfo); - - writeCompletionFut.get(); - } - catch (IgniteCheckedException e) { - err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e); - } - - metrics.addWrittenBytesTime(bytes, time); - - // Await secondary file system processing to finish. - if (mode == DUAL_SYNC) { - try { - batch.await(); - } - catch (IgniteCheckedException e) { - if (err == null) - err = new IOException("Failed to close secondary file system stream [path=" + path + - ", fileInfo=" + fileInfo + ']', e); - } - } - - long modificationTime = System.currentTimeMillis(); - - try { - meta.unlock(fileInfo, modificationTime); - } - catch (IgniteFsFileNotFoundException ignore) { - data.delete(fileInfo); // Safety to ensure that all data blocks are deleted. - - throw new IOException("File was concurrently deleted: " + path); - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - - meta.updateParentListingAsync(parentId, fileInfo.id(), fileName, bytes, modificationTime); - - if (err != null) - throw err; - } - else { - try { - if (mode == DUAL_SYNC) - batch.await(); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to close secondary file system stream [path=" + path + - ", fileInfo=" + fileInfo + ']', e); - } - finally { - data.delete(fileInfo); - } - } - } - } - - /** - * Gets initial affinity range. This range will have 0 length and will start from first - * non-occupied file block. - * - * @param fileInfo File info to build initial range for. - * @return Affinity range. - */ - private GridGgfsFileAffinityRange initialStreamRange(GridGgfsFileInfo fileInfo) { - if (!ggfsCtx.configuration().isFragmentizerEnabled()) - return null; - - if (!Boolean.parseBoolean(fileInfo.properties().get(IgniteFs.PROP_PREFER_LOCAL_WRITES))) - return null; - - int blockSize = fileInfo.blockSize(); - - // Find first non-occupied block offset. - long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize; - - // Need to get last affinity key and reuse it if we are on the same node. - long lastBlockOff = off - fileInfo.blockSize(); - - if (lastBlockOff < 0) - lastBlockOff = 0; - - GridGgfsFileMap map = fileInfo.fileMap(); - - IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false); - - IgniteUuid affKey = data.nextAffinityKey(prevAffKey); - - return affKey == null ? null : new GridGgfsFileAffinityRange(off, off, affKey); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsOutputStreamImpl.class, this); - } - - /** - * Helper closure to reserve specified space and update file's length - */ - @GridInternal - private static final class ReserveSpaceClosure implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Space amount (bytes number) to increase file's length. */ - private long space; - - /** Affinity range for this particular update. */ - private GridGgfsFileAffinityRange range; - - /** - * Empty constructor required for {@link Externalizable}. - * - */ - public ReserveSpaceClosure() { - // No-op. - } - - /** - * Constructs the closure to reserve specified space and update file's length. - * - * @param space Space amount (bytes number) to increase file's length. - * @param range Affinity range specifying which part of file was colocated. - */ - private ReserveSpaceClosure(long space, GridGgfsFileAffinityRange range) { - this.space = space; - this.range = range; - } - - /** {@inheritDoc} */ - @Override public GridGgfsFileInfo apply(GridGgfsFileInfo oldInfo) { - GridGgfsFileMap oldMap = oldInfo.fileMap(); - - GridGgfsFileMap newMap = new GridGgfsFileMap(oldMap); - - newMap.addRange(range); - - // Update file length. - GridGgfsFileInfo updated = new GridGgfsFileInfo(oldInfo, oldInfo.length() + space); - - updated.fileMap(newMap); - - return updated; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(space); - out.writeObject(range); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - space = in.readLong(); - range = (GridGgfsFileAffinityRange)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ReserveSpaceClosure.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java deleted file mode 100644 index d883558..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * GGFS task arguments implementation. - */ -public class IgniteFsTaskArgsImpl<T> implements IgniteFsTaskArgs<T>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS name. */ - private String ggfsName; - - /** Paths. */ - private Collection<IgniteFsPath> paths; - - /** Record resolver. */ - private IgniteFsRecordResolver recRslvr; - - /** Skip non existent files flag. */ - private boolean skipNonExistentFiles; - - /** Maximum range length. */ - private long maxRangeLen; - - /** User argument. */ - private T usrArg; - - /** - * {@link Externalizable} support. - */ - public IgniteFsTaskArgsImpl() { - // No-op. - } - - /** - * Constructor. - * - * @param ggfsName GGFS name. - * @param paths Paths. - * @param recRslvr Record resolver. - * @param skipNonExistentFiles Skip non existent files flag. - * @param maxRangeLen Maximum range length. - * @param usrArg User argument. - */ - public IgniteFsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr, - boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { - this.ggfsName = ggfsName; - this.paths = paths; - this.recRslvr = recRslvr; - this.skipNonExistentFiles = skipNonExistentFiles; - this.maxRangeLen = maxRangeLen; - this.usrArg = usrArg; - } - - /** {@inheritDoc} */ - @Override public String ggfsName() { - return ggfsName; - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFsPath> paths() { - return paths; - } - - /** {@inheritDoc} */ - @Override public IgniteFsRecordResolver recordResolver() { - return recRslvr; - } - - /** {@inheritDoc} */ - @Override public boolean skipNonExistentFiles() { - return skipNonExistentFiles; - } - - /** {@inheritDoc} */ - @Override public long maxRangeLength() { - return maxRangeLen; - } - - /** {@inheritDoc} */ - @Override public T userArgument() { - return usrArg; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsTaskArgsImpl.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, ggfsName); - U.writeCollection(out, paths); - - out.writeObject(recRslvr); - out.writeBoolean(skipNonExistentFiles); - out.writeLong(maxRangeLen); - out.writeObject(usrArg); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ggfsName = U.readString(in); - paths = U.readCollection(in); - - recRslvr = (IgniteFsRecordResolver)in.readObject(); - skipNonExistentFiles = in.readBoolean(); - maxRangeLen = in.readLong(); - usrArg = (T)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/package.html deleted file mode 100644 index 95b8af7..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains high performance file system processer. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java new file mode 100644 index 0000000..87b4246 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.ggfs.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.fs.IgniteFsMode.*; + +/** + * Tests for GGFS per-block LR eviction policy. + */ +@SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) +public class GridCacheGgfsPerBlockLruEvictionPolicySelfTest extends GridGgfsCommonAbstractTest { + /** Primary GGFS name. */ + private static final String GGFS_PRIMARY = "ggfs-primary"; + + /** Primary GGFS name. */ + private static final String GGFS_SECONDARY = "ggfs-secondary"; + + /** Secondary file system REST endpoint configuration map. */ + private static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{ + put("type", "tcp"); + put("port", "11500"); + }}; + + /** File working in PRIMARY mode. */ + public static final IgniteFsPath FILE = new IgniteFsPath("/file"); + + /** File working in DUAL mode. */ + public static final IgniteFsPath FILE_RMT = new IgniteFsPath("/fileRemote"); + + /** Primary GGFS instances. */ + private static GridGgfsImpl ggfsPrimary; + + /** Secondary GGFS instance. */ + private static IgniteFs secondaryFs; + + /** Primary file system data cache. */ + private static GridCacheAdapter<GridGgfsBlockKey, byte[]> dataCache; + + /** Eviction policy */ + private static GridCacheGgfsPerBlockLruEvictionPolicy evictPlc; + + /** + * Start a grid with the primary file system. + * + * @throws Exception If failed. + */ + private void startPrimary() throws Exception { + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("dataCache"); + ggfsCfg.setMetaCacheName("metaCache"); + ggfsCfg.setName(GGFS_PRIMARY); + ggfsCfg.setBlockSize(512); + ggfsCfg.setDefaultMode(PRIMARY); + ggfsCfg.setPrefetchBlocks(1); + ggfsCfg.setSequentialReadsBeforePrefetch(Integer.MAX_VALUE); + ggfsCfg.setSecondaryFileSystem(secondaryFs); + + Map<String, IgniteFsMode> pathModes = new HashMap<>(); + + pathModes.put(FILE_RMT.toString(), DUAL_SYNC); + + ggfsCfg.setPathModes(pathModes); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + dataCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + + evictPlc = new GridCacheGgfsPerBlockLruEvictionPolicy(); + + dataCacheCfg.setEvictionPolicy(evictPlc); + dataCacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(128)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setQueryIndexEnabled(false); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + metaCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid-primary"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setRestEnabled(false); + + Ignite g = G.start(cfg); + + ggfsPrimary = (GridGgfsImpl)g.fileSystem(GGFS_PRIMARY); + + dataCache = ggfsPrimary.context().kernalContext().cache().internalCache( + ggfsPrimary.context().configuration().getDataCacheName()); + } + + /** + * Start a grid with the secondary file system. + * + * @throws Exception If failed. + */ + private void startSecondary() throws Exception { + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("dataCache"); + ggfsCfg.setMetaCacheName("metaCache"); + ggfsCfg.setName(GGFS_SECONDARY); + ggfsCfg.setBlockSize(512); + ggfsCfg.setDefaultMode(PRIMARY); + ggfsCfg.setIpcEndpointConfiguration(SECONDARY_REST_CFG); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + dataCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(128)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setQueryIndexEnabled(false); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + metaCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid-secondary"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setRestEnabled(false); + + Ignite g = G.start(cfg); + + secondaryFs = g.fileSystem(GGFS_SECONDARY); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try { + // Cleanup. + ggfsPrimary.format(); + + while (!dataCache.isEmpty()) + U.sleep(100); + + checkEvictionPolicy(0, 0); + } + finally { + stopAllGrids(false); + } + } + + /** + * Startup primary and secondary file systems. + * + * @throws Exception If failed. + */ + private void start() throws Exception { + startSecondary(); + startPrimary(); + + evictPlc.setMaxBlocks(0); + evictPlc.setMaxSize(0); + evictPlc.setExcludePaths(null); + } + + /** + * Test how evictions are handled for a file working in PRIMARY mode. + * + * @throws Exception If failed. + */ + public void testFilePrimary() throws Exception { + start(); + + // Create file in primary mode. It must not be propagated to eviction policy. + ggfsPrimary.create(FILE, true).close(); + + checkEvictionPolicy(0, 0); + + int blockSize = ggfsPrimary.info(FILE).blockSize(); + + append(FILE, blockSize); + + checkEvictionPolicy(0, 0); + + read(FILE, 0, blockSize); + + checkEvictionPolicy(0, 0); + } + + /** + * Test how evictions are handled for a file working in PRIMARY mode. + * + * @throws Exception If failed. + */ + public void testFileDual() throws Exception { + start(); + + ggfsPrimary.create(FILE_RMT, true).close(); + + checkEvictionPolicy(0, 0); + + int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); + + // File write. + append(FILE_RMT, blockSize); + + checkEvictionPolicy(1, blockSize); + + // One more write. + append(FILE_RMT, blockSize); + + checkEvictionPolicy(2, blockSize * 2); + + // Read. + read(FILE_RMT, 0, blockSize); + + checkEvictionPolicy(2, blockSize * 2); + } + + /** + * Ensure that a DUAL mode file is not propagated to eviction policy + * + * @throws Exception If failed. + */ + public void testFileDualExclusion() throws Exception { + start(); + + evictPlc.setExcludePaths(Collections.singleton(FILE_RMT.toString())); + + // Create file in primary mode. It must not be propagated to eviction policy. + ggfsPrimary.create(FILE_RMT, true).close(); + + checkEvictionPolicy(0, 0); + + int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); + + append(FILE_RMT, blockSize); + + checkEvictionPolicy(0, 0); + + read(FILE_RMT, 0, blockSize); + + checkEvictionPolicy(0, 0); + } + + /** + * Ensure that exception is thrown in case we are trying to rename file with one exclude setting to the file with + * another. + * + * @throws Exception If failed. + */ + public void testRenameDifferentExcludeSettings() throws Exception { + start(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfsPrimary.rename(FILE, FILE_RMT); + + return null; + } + }, IgniteFsInvalidPathException.class, "Cannot move file to a path with different eviction exclude setting " + + "(need to copy and remove)"); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfsPrimary.rename(FILE_RMT, FILE); + + return null; + } + }, IgniteFsInvalidPathException.class, "Cannot move file to a path with different eviction exclude setting " + + "(need to copy and remove)"); + } + + /** + * Test eviction caused by too much blocks. + * + * @throws Exception If failed. + */ + public void testBlockCountEviction() throws Exception { + start(); + + int blockCnt = 3; + + evictPlc.setMaxBlocks(blockCnt); + + ggfsPrimary.create(FILE_RMT, true).close(); + + checkEvictionPolicy(0, 0); + + int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); + + // Write blocks up to the limit. + append(FILE_RMT, blockSize * blockCnt); + + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + + // Write one more block what should cause eviction. + append(FILE_RMT, blockSize); + + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + + // Read the first block. + read(FILE_RMT, 0, blockSize); + + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + checkMetrics(1, 1); + } + + /** + * Test eviction caused by too big data size. + * + * @throws Exception If failed. + */ + public void testDataSizeEviction() throws Exception { + start(); + + ggfsPrimary.create(FILE_RMT, true).close(); + + int blockCnt = 3; + int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); + + evictPlc.setMaxSize(blockSize * blockCnt); + + // Write blocks up to the limit. + append(FILE_RMT, blockSize * blockCnt); + + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + + // Reset metrics. + ggfsPrimary.resetMetrics(); + + // Read the first block what should cause reordering. + read(FILE_RMT, 0, blockSize); + + checkMetrics(1, 0); + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + + // Write one more block what should cause eviction of the block 2. + append(FILE_RMT, blockSize); + + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + + // Read the first block. + read(FILE_RMT, 0, blockSize); + + checkMetrics(2, 0); + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + + // Read the second block (which was evicted). + read(FILE_RMT, blockSize, blockSize); + + checkMetrics(3, 1); + checkEvictionPolicy(blockCnt, blockCnt * blockSize); + } + + /** + * Read some data from the given file with the given offset. + * + * @param path File path. + * @param off Offset. + * @param len Length. + * @throws Exception If failed. + */ + private void read(IgniteFsPath path, int off, int len) throws Exception { + IgniteFsInputStream is = ggfsPrimary.open(path); + + is.readFully(off, new byte[len]); + + is.close(); + } + + /** + * Append some data to the given file. + * + * @param path File path. + * @param len Data length. + * @throws Exception If failed. + */ + private void append(IgniteFsPath path, int len) throws Exception { + IgniteFsOutputStream os = ggfsPrimary.append(path, false); + + os.write(new byte[len]); + + os.close(); + } + + /** + * Check metrics counters. + * + * @param blocksRead Expected blocks read. + * @param blocksReadRmt Expected blocks read remote. + * @throws Exception If failed. + */ + public void checkMetrics(final long blocksRead, final long blocksReadRmt) throws Exception { + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + IgniteFsMetrics metrics = ggfsPrimary.metrics(); + + return metrics.blocksReadTotal() == blocksRead && metrics.blocksReadRemote() == blocksReadRmt; + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + } + }, 5000) : "Unexpected metrics [expectedBlocksReadTotal=" + blocksRead + ", actualBlocksReadTotal=" + + ggfsPrimary.metrics().blocksReadTotal() + ", expectedBlocksReadRemote=" + blocksReadRmt + + ", actualBlocksReadRemote=" + ggfsPrimary.metrics().blocksReadRemote() + ']'; + } + + /** + * Check eviction policy state. + * + * @param curBlocks Current blocks. + * @param curBytes Current bytes. + */ + private void checkEvictionPolicy(final int curBlocks, final long curBytes) throws IgniteInterruptedException { + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return evictPlc.getCurrentBlocks() == curBlocks && evictPlc.getCurrentSize() == curBytes; + } + }, 5000) : "Unexpected counts [expectedBlocks=" + curBlocks + ", actualBlocks=" + evictPlc.getCurrentBlocks() + + ", expectedBytes=" + curBytes + ", currentBytes=" + curBytes + ']'; + } +}
