http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsMetricsAdapter.java deleted file mode 100644 index 0669aec..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsMetricsAdapter.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * GGFS metrics adapter. - */ -public class IgfsMetricsAdapter implements IgfsMetrics, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Used space on local node. */ - private long locSpaceSize; - - /** Maximum space. */ - private long maxSpaceSize; - - /** Secondary file system used space. */ - private long secondarySpaceSize; - - /** Number of directories. */ - private int dirsCnt; - - /** Number of files. */ - private int filesCnt; - - /** Number of files opened for read. */ - private int filesOpenedForRead; - - /** Number of files opened for write. */ - private int filesOpenedForWrite; - - /** Total blocks read. */ - private long blocksReadTotal; - - /** Total blocks remote read. */ - private long blocksReadRmt; - - /** Total blocks write. */ - private long blocksWrittenTotal; - - /** Total blocks write remote. */ - private long blocksWrittenRmt; - - /** Total bytes read. */ - private long bytesRead; - - /** Total bytes read time. */ - private long bytesReadTime; - - /** Total bytes write. */ - private long bytesWritten; - - /** Total bytes write time. */ - private long bytesWriteTime; - - /** - * {@link Externalizable} support. - */ - public IgfsMetricsAdapter() { - // No-op. - } - - /** - * @param locSpaceSize Used space on local node. - * @param maxSpaceSize Maximum space size. - * @param secondarySpaceSize Secondary space size. - * @param dirsCnt Number of directories. - * @param filesCnt Number of files. - * @param filesOpenedForRead Number of files opened for read. - * @param filesOpenedForWrite Number of files opened for write. - * @param blocksReadTotal Total blocks read. - * @param blocksReadRmt Total blocks read remotely. - * @param blocksWrittenTotal Total blocks written. - * @param blocksWrittenRmt Total blocks written remotely. - * @param bytesRead Total bytes read. - * @param bytesReadTime Total bytes read time. - * @param bytesWritten Total bytes written. - * @param bytesWriteTime Total bytes write time. - */ - public IgfsMetricsAdapter(long locSpaceSize, long maxSpaceSize, long secondarySpaceSize, int dirsCnt, - int filesCnt, int filesOpenedForRead, int filesOpenedForWrite, long blocksReadTotal, long blocksReadRmt, - long blocksWrittenTotal, long blocksWrittenRmt, long bytesRead, long bytesReadTime, long bytesWritten, - long bytesWriteTime) { - this.locSpaceSize = locSpaceSize; - this.maxSpaceSize = maxSpaceSize; - this.secondarySpaceSize = secondarySpaceSize; - this.dirsCnt = dirsCnt; - this.filesCnt = filesCnt; - this.filesOpenedForRead = filesOpenedForRead; - this.filesOpenedForWrite = filesOpenedForWrite; - this.blocksReadTotal = blocksReadTotal; - this.blocksReadRmt = blocksReadRmt; - this.blocksWrittenTotal = blocksWrittenTotal; - this.blocksWrittenRmt = blocksWrittenRmt; - this.bytesRead = bytesRead; - this.bytesReadTime = bytesReadTime; - this.bytesWritten = bytesWritten; - this.bytesWriteTime = bytesWriteTime; - } - - /** {@inheritDoc} */ - @Override public long localSpaceSize() { - return locSpaceSize; - } - - /** {@inheritDoc} */ - @Override public long maxSpaceSize() { - return maxSpaceSize; - } - - /** {@inheritDoc} */ - @Override public long secondarySpaceSize() { - return secondarySpaceSize; - } - - /** {@inheritDoc} */ - @Override public int directoriesCount() { - return dirsCnt; - } - - /** {@inheritDoc} */ - @Override public int filesCount() { - return filesCnt; - } - - /** {@inheritDoc} */ - @Override public int filesOpenedForRead() { - return filesOpenedForRead; - } - - /** {@inheritDoc} */ - @Override public int filesOpenedForWrite() { - return filesOpenedForWrite; - } - - /** {@inheritDoc} */ - @Override public long blocksReadTotal() { - return blocksReadTotal; - } - - /** {@inheritDoc} */ - @Override public long blocksReadRemote() { - return blocksReadRmt; - } - - /** {@inheritDoc} */ - @Override public long blocksWrittenTotal() { - return blocksWrittenTotal; - } - - /** {@inheritDoc} */ - @Override public long blocksWrittenRemote() { - return blocksWrittenRmt; - } - - /** {@inheritDoc} */ - @Override public long bytesRead() { - return bytesRead; - } - - /** {@inheritDoc} */ - @Override public long bytesReadTime() { - return bytesReadTime; - } - - /** {@inheritDoc} */ - @Override public long bytesWritten() { - return bytesWritten; - } - - /** {@inheritDoc} */ - @Override public long bytesWriteTime() { - return bytesWriteTime; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(locSpaceSize); - out.writeLong(maxSpaceSize); - out.writeLong(secondarySpaceSize); - out.writeInt(dirsCnt); - out.writeInt(filesCnt); - out.writeInt(filesOpenedForRead); - out.writeInt(filesOpenedForWrite); - out.writeLong(blocksReadTotal); - out.writeLong(blocksReadRmt); - out.writeLong(blocksWrittenTotal); - out.writeLong(blocksWrittenRmt); - out.writeLong(bytesRead); - out.writeLong(bytesReadTime); - out.writeLong(bytesWritten); - out.writeLong(bytesWriteTime); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException { - locSpaceSize = in.readLong(); - maxSpaceSize = in.readLong(); - secondarySpaceSize = in.readLong(); - dirsCnt = in.readInt(); - filesCnt = in.readInt(); - filesOpenedForRead = in.readInt(); - filesOpenedForWrite = in.readInt(); - blocksReadTotal = in.readLong(); - blocksReadRmt = in.readLong(); - blocksWrittenTotal = in.readLong(); - blocksWrittenRmt = in.readLong(); - bytesRead = in.readLong(); - bytesReadTime = in.readLong(); - bytesWritten = in.readLong(); - bytesWriteTime = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsMetricsAdapter.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsModeResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsModeResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsModeResolver.java deleted file mode 100644 index 0896dfd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsModeResolver.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * - */ -public class IgfsModeResolver { - /** Maximum size of map with cached path modes. */ - private static final int MAX_PATH_CACHE = 1000; - - /** Default mode. */ - private final IgfsMode dfltMode; - - /** Modes for particular paths. Ordered from longest to shortest. */ - private ArrayList<T2<IgfsPath, IgfsMode>> modes; - - /** Cached modes per path. */ - private Map<IgfsPath, IgfsMode> modesCache; - - /** Cached children modes per path. */ - private Map<IgfsPath, Set<IgfsMode>> childrenModesCache; - - /** - * @param dfltMode Default GGFS mode. - * @param modes List of configured modes. - */ - public IgfsModeResolver(IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> modes) { - assert dfltMode != null; - - this.dfltMode = dfltMode; - - if (modes != null) { - ArrayList<T2<IgfsPath, IgfsMode>> modes0 = new ArrayList<>(modes); - - // Sort paths, longest first. - Collections.sort(modes0, new Comparator<Map.Entry<IgfsPath, IgfsMode>>() { - @Override public int compare(Map.Entry<IgfsPath, IgfsMode> o1, - Map.Entry<IgfsPath, IgfsMode> o2) { - return o2.getKey().components().size() - o1.getKey().components().size(); - } - }); - - this.modes = modes0; - - modesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); - childrenModesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); - } - } - - /** - * Resolves GGFS mode for the given path. - * - * @param path GGFS path. - * @return GGFS mode. - */ - public IgfsMode resolveMode(IgfsPath path) { - assert path != null; - - if (modes == null) - return dfltMode; - else { - IgfsMode mode = modesCache.get(path); - - if (mode == null) { - for (T2<IgfsPath, IgfsMode> entry : modes) { - if (startsWith(path, entry.getKey())) { - // As modes ordered from most specific to least specific first mode found is ours. - mode = entry.getValue(); - - break; - } - } - - if (mode == null) - mode = dfltMode; - - modesCache.put(path, mode); - } - - return mode; - } - } - - /** - * @param path Path. - * @return Set of all modes that children paths could have. - */ - public Set<IgfsMode> resolveChildrenModes(IgfsPath path) { - assert path != null; - - if (modes == null) - return Collections.singleton(dfltMode); - else { - Set<IgfsMode> children = childrenModesCache.get(path); - - if (children == null) { - children = new HashSet<>(IgfsMode.values().length, 1.0f); - - IgfsMode pathDefault = dfltMode; - - for (T2<IgfsPath, IgfsMode> child : modes) { - if (startsWith(path, child.getKey())) { - pathDefault = child.getValue(); - - break; - } - else if (startsWith(child.getKey(), path)) - children.add(child.getValue()); - } - - children.add(pathDefault); - - childrenModesCache.put(path, children); - } - - return children; - } - } - - /** - * @return Unmodifiable copy of properly ordered modes prefixes - * or {@code null} if no modes set. - */ - @Nullable public List<T2<IgfsPath, IgfsMode>> modesOrdered() { - return modes != null ? Collections.unmodifiableList(modes) : null; - } - - /** - * Check if path starts with prefix. - * - * @param path Path. - * @param prefix Prefix. - * @return {@code true} if path starts with prefix, {@code false} if not. - */ - private static boolean startsWith(IgfsPath path, IgfsPath prefix) { - List<String> p1Comps = path.components(); - List<String> p2Comps = prefix.components(); - - if (p2Comps.size() > p1Comps.size()) - return false; - - for (int i = 0; i < p1Comps.size(); i++) { - if (i >= p2Comps.size() || p2Comps.get(i) == null) - // All prefix components already matched. - return true; - - if (!p1Comps.get(i).equals(p2Comps.get(i))) - return false; - } - - // Path and prefix components had same length and all of them matched. - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopHelper.java deleted file mode 100644 index 8d47778..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopHelper.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; - -/** - * No-op utils processor adapter. - */ -public class IgfsNoopHelper implements IgfsHelper { - /** {@inheritDoc} */ - @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean isGgfsBlockKey(Object key) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopProcessor.java deleted file mode 100644 index fcc0653..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsNoopProcessor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Nop Ignite file system processor implementation. - */ -public class IgfsNoopProcessor extends IgfsProcessorAdapter { - /** - * Constructor. - * - * @param ctx Kernal context. - */ - public IgfsNoopProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ggfsCacheSize: " + 0); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFs> ggfss() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteFs ggfs(@Nullable String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<IpcServerEndpoint> endpoints(@Nullable String name) { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Nullable @Override public ComputeJob createJob(IgfsJob job, @Nullable String ggfsName, IgfsPath path, - long start, long length, IgfsRecordResolver recRslv) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamAdapter.java deleted file mode 100644 index cb8f2aa..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamAdapter.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; - -/** - * Output stream to store data into grid cache with separate blocks. - */ -@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") -abstract class IgfsOutputStreamAdapter extends IgfsOutputStream { - /** Path to file. */ - protected final IgfsPath path; - - /** Buffer size. */ - private final int bufSize; - - /** Flag for this stream open/closed state. */ - private boolean closed; - - /** Local buffer to store stream data as consistent block. */ - private ByteBuffer buf; - - /** Bytes written. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - protected long bytes; - - /** Time consumed by write operations. */ - protected long time; - - /** - * Constructs file output stream. - * - * @param path Path to stored file. - * @param bufSize The size of the buffer to be used. - */ - IgfsOutputStreamAdapter(IgfsPath path, int bufSize) { - assert path != null; - assert bufSize > 0; - - this.path = path; - this.bufSize = bufSize; - } - - /** - * Gets number of written bytes. - * - * @return Written bytes. - */ - public long bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(int b) throws IOException { - checkClosed(null, 0); - - long startTime = System.nanoTime(); - - b &= 0xFF; - - if (buf == null) - buf = ByteBuffer.allocate(bufSize); - - buf.put((byte)b); - - if (buf.position() >= bufSize) - sendData(true); // Send data to server. - - time += System.nanoTime() - startTime; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(byte[] b, int off, int len) throws IOException { - A.notNull(b, "b"); - - if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off + - ", length=" + len + ']'); - } - - checkClosed(null, 0); - - if (len == 0) - return; // Done. - - long startTime = System.nanoTime(); - - if (buf == null) { - // Do not allocate and copy byte buffer if will send data immediately. - if (len >= bufSize) { - buf = ByteBuffer.wrap(b, off, len); - - sendData(false); - - return; - } - - buf = ByteBuffer.allocate(Math.max(bufSize, len)); - } - - if (buf.remaining() < len) - // Expand buffer capacity, if remaining size is less then data size. - buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip()); - - assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " + - "[len=" + len + ", buf.remaining=" + buf.remaining() + ']'; - - buf.put(b, off, len); - - if (buf.position() >= bufSize) - sendData(true); // Send data to server. - - time += System.nanoTime() - startTime; - } - - /** {@inheritDoc} */ - @Override public synchronized void transferFrom(DataInput in, int len) throws IOException { - checkClosed(in, len); - - long startTime = System.nanoTime(); - - // Send all IPC data from the local buffer before streaming. - if (buf != null && buf.position() > 0) - sendData(true); - - try { - storeDataBlocks(in, len); - } - catch (IgniteCheckedException e) { - throw new IOException(e.getMessage(), e); - } - - time += System.nanoTime() - startTime; - } - - /** - * Flushes this output stream and forces any buffered output bytes to be written out. - * - * @exception IOException if an I/O error occurs. - */ - @Override public synchronized void flush() throws IOException { - checkClosed(null, 0); - - // Send all IPC data from the local buffer. - if (buf != null && buf.position() > 0) - sendData(true); - } - - /** {@inheritDoc} */ - @Override public final synchronized void close() throws IOException { - // Do nothing if stream is already closed. - if (closed) - return; - - try { - // Send all IPC data from the local buffer. - try { - flush(); - } - finally { - onClose(); // "onClose()" routine must be invoked anyway! - } - } - finally { - // Mark this stream closed AFTER flush. - closed = true; - } - } - - /** - * Store data blocks in file.<br/> - * Note! If file concurrently deleted we'll get lost blocks. - * - * @param data Data to store. - * @throws IgniteCheckedException If failed. - */ - protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException; - - /** - * Store data blocks in file reading appropriate number of bytes from given data input. - * - * @param in Data input to read from. - * @param len Data length to store. - * @throws IgniteCheckedException If failed. - */ - protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException; - - /** - * Close callback. It will be called only once in synchronized section. - * - * @throws IOException If failed. - */ - protected void onClose() throws IOException { - // No-op. - } - - /** - * Validate this stream is open. - * - * @throws IOException If this stream is closed. - */ - private void checkClosed(@Nullable DataInput in, int len) throws IOException { - assert Thread.holdsLock(this); - - if (closed) { - // Must read data from stream before throwing exception. - if (in != null) - in.skipBytes(len); - - throw new IOException("Stream has been closed: " + this); - } - } - - /** - * Send all local-buffered data to server. - * - * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped - * byte array. - * @throws IOException In case of IO exception. - */ - private void sendData(boolean flip) throws IOException { - assert Thread.holdsLock(this); - - try { - if (flip) - buf.flip(); - - storeDataBlock(buf); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to store data into file: " + path, e); - } - - buf = null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsOutputStreamAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamImpl.java deleted file mode 100644 index eec5efc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsOutputStreamImpl.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.igfs.IgfsMode.*; - -/** - * Output stream to store data into grid cache with separate blocks. - */ -class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { - /** Maximum number of blocks in buffer. */ - private static final int MAX_BLOCKS_CNT = 16; - - /** GGFS context. */ - private IgfsContext ggfsCtx; - - /** Meta info manager. */ - private final IgfsMetaManager meta; - - /** Data manager. */ - private final IgfsDataManager data; - - /** File descriptor. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IgfsFileInfo fileInfo; - - /** Parent ID. */ - private final IgniteUuid parentId; - - /** File name. */ - private final String fileName; - - /** Space in file to write data. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private long space; - - /** Intermediate remainder to keep data. */ - private byte[] remainder; - - /** Data length in remainder. */ - private int remainderDataLen; - - /** Write completion future. */ - private final IgniteInternalFuture<Boolean> writeCompletionFut; - - /** GGFS mode. */ - private final IgfsMode mode; - - /** File worker batch. */ - private final IgfsFileWorkerBatch batch; - - /** Ensures that onClose)_ routine is called no more than once. */ - private final AtomicBoolean onCloseGuard = new AtomicBoolean(); - - /** Local GGFS metrics. */ - private final IgfsLocalMetrics metrics; - - /** Affinity written by this output stream. */ - private IgfsFileAffinityRange streamRange; - - /** - * Constructs file output stream. - * - * @param ggfsCtx GGFS context. - * @param path Path to stored file. - * @param fileInfo File info to write binary data to. - * @param bufSize The size of the buffer to be used. - * @param mode Grid GGFS mode. - * @param batch Optional secondary file system batch. - * @param metrics Local GGFs metrics. - * @throws IgniteCheckedException If stream creation failed. - */ - IgfsOutputStreamImpl(IgfsContext ggfsCtx, IgfsPath path, IgfsFileInfo fileInfo, IgniteUuid parentId, - int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) - throws IgniteCheckedException { - super(path, optimizeBufferSize(bufSize, fileInfo)); - - assert fileInfo != null; - assert fileInfo.isFile() : "Unexpected file info: " + fileInfo; - assert mode != null && mode != PROXY; - assert mode == PRIMARY && batch == null || batch != null; - assert metrics != null; - - // File hasn't been locked. - if (fileInfo.lockId() == null) - throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path); - - this.ggfsCtx = ggfsCtx; - meta = ggfsCtx.meta(); - data = ggfsCtx.data(); - - this.fileInfo = fileInfo; - this.mode = mode; - this.batch = batch; - this.parentId = parentId; - this.metrics = metrics; - - streamRange = initialStreamRange(fileInfo); - - fileName = path.name(); - - writeCompletionFut = data.writeStart(fileInfo); - } - - /** - * Optimize buffer size. - * - * @param bufSize Requested buffer size. - * @param fileInfo File info. - * @return Optimized buffer size. - */ - @SuppressWarnings("IfMayBeConditional") - private static int optimizeBufferSize(int bufSize, IgfsFileInfo fileInfo) { - assert bufSize > 0; - - if (fileInfo == null) - return bufSize; - - int blockSize = fileInfo.blockSize(); - - if (blockSize <= 0) - return bufSize; - - if (bufSize <= blockSize) - // Optimize minimum buffer size to be equal file's block size. - return blockSize; - - int maxBufSize = blockSize * MAX_BLOCKS_CNT; - - if (bufSize > maxBufSize) - // There is no profit or optimization from larger buffers. - return maxBufSize; - - if (fileInfo.length() == 0) - // Make buffer size multiple of block size (optimized for new files). - return bufSize / blockSize * blockSize; - - return bufSize; - } - - /** {@inheritDoc} */ - @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException { - int writeLen = block.remaining(); - - preStoreDataBlocks(null, writeLen); - - int blockSize = fileInfo.blockSize(); - - // If data length is not enough to fill full block, fill the remainder and return. - if (remainderDataLen + writeLen < blockSize) { - if (remainder == null) - remainder = new byte[blockSize]; - else if (remainder.length != blockSize) { - assert remainderDataLen == remainder.length; - - byte[] allocated = new byte[blockSize]; - - U.arrayCopy(remainder, 0, allocated, 0, remainder.length); - - remainder = allocated; - } - - block.get(remainder, remainderDataLen, writeLen); - - remainderDataLen += writeLen; - } - else { - remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block, - false, streamRange, batch); - - remainderDataLen = remainder == null ? 0 : remainder.length; - } - } - - /** {@inheritDoc} */ - @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException { - preStoreDataBlocks(in, len); - - int blockSize = fileInfo.blockSize(); - - // If data length is not enough to fill full block, fill the remainder and return. - if (remainderDataLen + len < blockSize) { - if (remainder == null) - remainder = new byte[blockSize]; - else if (remainder.length != blockSize) { - assert remainderDataLen == remainder.length; - - byte[] allocated = new byte[blockSize]; - - U.arrayCopy(remainder, 0, allocated, 0, remainder.length); - - remainder = allocated; - } - - in.readFully(remainder, remainderDataLen, len); - - remainderDataLen += len; - } - else { - remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len, - false, streamRange, batch); - - remainderDataLen = remainder == null ? 0 : remainder.length; - } - } - - /** - * Initializes data loader if it was not initialized yet and updates written space. - * - * @param len Data length to be written. - */ - private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException { - // Check if any exception happened while writing data. - if (writeCompletionFut.isDone()) { - assert ((GridFutureAdapter)writeCompletionFut).isFailed(); - - if (in != null) - in.skipBytes(len); - - writeCompletionFut.get(); - } - - bytes += len; - space += len; - } - - /** - * Flushes this output stream and forces any buffered output bytes to be written out. - * - * @exception IOException if an I/O error occurs. - */ - @Override public synchronized void flush() throws IOException { - boolean exists; - - try { - exists = meta.exists(fileInfo.id()); - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - - if (!exists) { - onClose(true); - - throw new IOException("File was concurrently deleted: " + path); - } - - super.flush(); - - try { - if (remainder != null) { - data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0, - ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); - - remainder = null; - remainderDataLen = 0; - } - - if (space > 0) { - IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(), - new ReserveSpaceClosure(space, streamRange)); - - if (fileInfo0 == null) - throw new IOException("File was concurrently deleted: " + path); - else - fileInfo = fileInfo0; - - streamRange = initialStreamRange(fileInfo); - - space = 0; - } - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e); - } - } - - /** {@inheritDoc} */ - @Override protected void onClose() throws IOException { - onClose(false); - } - - /** - * Close callback. It will be called only once in synchronized section. - * - * @param deleted Whether we already know that the file was deleted. - * @throws IOException If failed. - */ - private void onClose(boolean deleted) throws IOException { - assert Thread.holdsLock(this); - - if (onCloseGuard.compareAndSet(false, true)) { - // Notify backing secondary file system batch to finish. - if (mode != PRIMARY) { - assert batch != null; - - batch.finish(); - } - - // Ensure file existence. - boolean exists; - - try { - exists = !deleted && meta.exists(fileInfo.id()); - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - - if (exists) { - IOException err = null; - - try { - data.writeClose(fileInfo); - - writeCompletionFut.get(); - } - catch (IgniteCheckedException e) { - err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e); - } - - metrics.addWrittenBytesTime(bytes, time); - - // Await secondary file system processing to finish. - if (mode == DUAL_SYNC) { - try { - batch.await(); - } - catch (IgniteCheckedException e) { - if (err == null) - err = new IOException("Failed to close secondary file system stream [path=" + path + - ", fileInfo=" + fileInfo + ']', e); - } - } - - long modificationTime = System.currentTimeMillis(); - - try { - meta.unlock(fileInfo, modificationTime); - } - catch (IgfsFileNotFoundException ignore) { - data.delete(fileInfo); // Safety to ensure that all data blocks are deleted. - - throw new IOException("File was concurrently deleted: " + path); - } - catch (IgniteCheckedException e) { - throw new IOError(e); // Something unrecoverable. - } - - meta.updateParentListingAsync(parentId, fileInfo.id(), fileName, bytes, modificationTime); - - if (err != null) - throw err; - } - else { - try { - if (mode == DUAL_SYNC) - batch.await(); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to close secondary file system stream [path=" + path + - ", fileInfo=" + fileInfo + ']', e); - } - finally { - data.delete(fileInfo); - } - } - } - } - - /** - * Gets initial affinity range. This range will have 0 length and will start from first - * non-occupied file block. - * - * @param fileInfo File info to build initial range for. - * @return Affinity range. - */ - private IgfsFileAffinityRange initialStreamRange(IgfsFileInfo fileInfo) { - if (!ggfsCtx.configuration().isFragmentizerEnabled()) - return null; - - if (!Boolean.parseBoolean(fileInfo.properties().get(IgniteFs.PROP_PREFER_LOCAL_WRITES))) - return null; - - int blockSize = fileInfo.blockSize(); - - // Find first non-occupied block offset. - long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize; - - // Need to get last affinity key and reuse it if we are on the same node. - long lastBlockOff = off - fileInfo.blockSize(); - - if (lastBlockOff < 0) - lastBlockOff = 0; - - IgfsFileMap map = fileInfo.fileMap(); - - IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false); - - IgniteUuid affKey = data.nextAffinityKey(prevAffKey); - - return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsOutputStreamImpl.class, this); - } - - /** - * Helper closure to reserve specified space and update file's length - */ - @GridInternal - private static final class ReserveSpaceClosure implements IgniteClosure<IgfsFileInfo, IgfsFileInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Space amount (bytes number) to increase file's length. */ - private long space; - - /** Affinity range for this particular update. */ - private IgfsFileAffinityRange range; - - /** - * Empty constructor required for {@link Externalizable}. - * - */ - public ReserveSpaceClosure() { - // No-op. - } - - /** - * Constructs the closure to reserve specified space and update file's length. - * - * @param space Space amount (bytes number) to increase file's length. - * @param range Affinity range specifying which part of file was colocated. - */ - private ReserveSpaceClosure(long space, IgfsFileAffinityRange range) { - this.space = space; - this.range = range; - } - - /** {@inheritDoc} */ - @Override public IgfsFileInfo apply(IgfsFileInfo oldInfo) { - IgfsFileMap oldMap = oldInfo.fileMap(); - - IgfsFileMap newMap = new IgfsFileMap(oldMap); - - newMap.addRange(range); - - // Update file length. - IgfsFileInfo updated = new IgfsFileInfo(oldInfo, oldInfo.length() + space); - - updated.fileMap(newMap); - - return updated; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(space); - out.writeObject(range); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - space = in.readLong(); - range = (IgfsFileAffinityRange)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ReserveSpaceClosure.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsPaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsPaths.java deleted file mode 100644 index 9687690..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsPaths.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Description of path modes. - */ -public class IgfsPaths implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Additional secondary file system properties. */ - private Map<String, String> props; - - /** Default GGFS mode. */ - private IgfsMode dfltMode; - - /** Path modes. */ - private List<T2<IgfsPath, IgfsMode>> pathModes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgfsPaths() { - // No-op. - } - - /** - * Constructor. - * - * @param props Additional secondary file system properties. - * @param dfltMode Default GGFS mode. - * @param pathModes Path modes. - */ - public IgfsPaths(Map<String, String> props, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, - IgfsMode>> pathModes) { - this.props = props; - this.dfltMode = dfltMode; - this.pathModes = pathModes; - } - - /** - * @return Secondary file system properties. - */ - public Map<String, String> properties() { - return props; - } - - /** - * @return Default GGFS mode. - */ - public IgfsMode defaultMode() { - return dfltMode; - } - - /** - * @return Path modes. - */ - @Nullable public List<T2<IgfsPath, IgfsMode>> pathModes() { - return pathModes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeStringMap(out, props); - U.writeEnum(out, dfltMode); - - if (pathModes != null) { - out.writeBoolean(true); - out.writeInt(pathModes.size()); - - for (T2<IgfsPath, IgfsMode> pathMode : pathModes) { - pathMode.getKey().writeExternal(out); - U.writeEnum(out, pathMode.getValue()); - } - } - else - out.writeBoolean(false); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - props = U.readStringMap(in); - dfltMode = IgfsMode.fromOrdinal(in.readByte()); - - if (in.readBoolean()) { - int size = in.readInt(); - - pathModes = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - IgfsPath path = new IgfsPath(); - path.readExternal(in); - - T2<IgfsPath, IgfsMode> entry = new T2<>(path, IgfsMode.fromOrdinal(in.readByte())); - - pathModes.add(entry); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessor.java deleted file mode 100644 index 7148411..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessor.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.cache.CacheMemoryMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.igfs.IgfsMode.*; -import static org.apache.ignite.internal.IgniteNodeAttributes.*; - -/** - * Fully operational Ignite file system processor. - */ -public class IgfsProcessor extends IgfsProcessorAdapter { - /** Null GGFS name. */ - private static final String NULL_NAME = UUID.randomUUID().toString(); - - /** Converts context to GGFS. */ - private static final IgniteClosure<IgfsContext,IgniteFs> CTX_TO_GGFS = new C1<IgfsContext, IgniteFs>() { - @Override public IgniteFs apply(IgfsContext ggfsCtx) { - return ggfsCtx.ggfs(); - } - }; - - /** */ - private final ConcurrentMap<String, IgfsContext> ggfsCache = - new ConcurrentHashMap8<>(); - - /** - * @param ctx Kernal context. - */ - public IgfsProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - IgfsConfiguration[] cfgs = ctx.config().getGgfsConfiguration(); - - assert cfgs != null && cfgs.length > 0; - - validateLocalGgfsConfigurations(cfgs); - - // Start GGFS instances. - for (IgfsConfiguration cfg : cfgs) { - IgfsContext ggfsCtx = new IgfsContext( - ctx, - new IgfsConfiguration(cfg), - new IgfsMetaManager(), - new IgfsDataManager(), - new IgfsServerManager(), - new IgfsFragmentizerManager()); - - // Start managers first. - for (IgfsManager mgr : ggfsCtx.managers()) - mgr.start(ggfsCtx); - - ggfsCache.put(maskName(cfg.getName()), ggfsCtx); - } - - if (log.isDebugEnabled()) - log.debug("GGFS processor started."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) - checkGgfsOnRemoteNode(n); - } - - for (IgfsContext ggfsCtx : ggfsCache.values()) - for (IgfsManager mgr : ggfsCtx.managers()) - mgr.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - // Stop GGFS instances. - for (IgfsContext ggfsCtx : ggfsCache.values()) { - if (log.isDebugEnabled()) - log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); - - List<IgfsManager> mgrs = ggfsCtx.managers(); - - for (ListIterator<IgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - IgfsManager mgr = it.previous(); - - mgr.stop(cancel); - } - - ggfsCtx.ggfs().stop(); - } - - ggfsCache.clear(); - - if (log.isDebugEnabled()) - log.debug("GGFS processor stopped."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - for (IgfsContext ggfsCtx : ggfsCache.values()) { - if (log.isDebugEnabled()) - log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); - - List<IgfsManager> mgrs = ggfsCtx.managers(); - - for (ListIterator<IgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - IgfsManager mgr = it.previous(); - - mgr.onKernalStop(cancel); - } - } - - if (log.isDebugEnabled()) - log.debug("Finished executing GGFS processor onKernalStop() callback."); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ggfsCacheSize: " + ggfsCache.size()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection<IgniteFs> ggfss() { - return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS); - } - - /** {@inheritDoc} */ - @Override @Nullable public IgniteFs ggfs(@Nullable String name) { - IgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - - return ggfsCtx == null ? null : ggfsCtx.ggfs(); - } - - /** {@inheritDoc} */ - @Override @Nullable public Collection<IpcServerEndpoint> endpoints(@Nullable String name) { - IgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - - return ggfsCtx == null ? Collections.<IpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); - } - - /** {@inheritDoc} */ - @Nullable @Override public ComputeJob createJob(IgfsJob job, @Nullable String ggfsName, IgfsPath path, - long start, long len, IgfsRecordResolver recRslv) { - return new IgfsJobImpl(job, ggfsName, path, start, len, recRslv); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { - super.addAttributes(attrs); - - IgniteConfiguration gridCfg = ctx.config(); - - // Node doesn't have GGFS if it: - // is daemon; - // doesn't have configured GGFS; - // doesn't have configured caches. - if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getGgfsConfiguration()) || - F.isEmpty(gridCfg.getCacheConfiguration())) - return; - - final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>(); - - F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() { - @Override public void apply(CacheConfiguration c) { - cacheCfgs.put(c.getName(), c); - } - }); - - Collection<IgfsAttributes> attrVals = new ArrayList<>(); - - assert gridCfg.getGgfsConfiguration() != null; - - for (IgfsConfiguration ggfsCfg : gridCfg.getGgfsConfiguration()) { - CacheConfiguration cacheCfg = cacheCfgs.get(ggfsCfg.getDataCacheName()); - - if (cacheCfg == null) - continue; // No cache for the given GGFS configuration. - - CacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); - - if (!(affMapper instanceof IgfsGroupDataBlocksKeyMapper)) - // Do not create GGFS attributes for such a node nor throw error about invalid configuration. - // Configuration will be validated later, while starting GridGgfsProcessor. - continue; - - attrVals.add(new IgfsAttributes( - ggfsCfg.getName(), - ggfsCfg.getBlockSize(), - ((IgfsGroupDataBlocksKeyMapper)affMapper).groupSize(), - ggfsCfg.getMetaCacheName(), - ggfsCfg.getDataCacheName(), - ggfsCfg.getDefaultMode(), - ggfsCfg.getPathModes(), - ggfsCfg.isFragmentizerEnabled())); - } - - attrs.put(ATTR_GGFS, attrVals.toArray(new IgfsAttributes[attrVals.size()])); - } - - /** - * @param name Cache name. - * @return Masked name accounting for {@code nulls}. - */ - private String maskName(@Nullable String name) { - return name == null ? NULL_NAME : name; - } - - /** - * Validates local GGFS configurations. Compares attributes only for GGFSes with same name. - * @param cfgs GGFS configurations - * @throws IgniteCheckedException If any of GGFS configurations is invalid. - */ - private void validateLocalGgfsConfigurations(IgfsConfiguration[] cfgs) throws IgniteCheckedException { - Collection<String> cfgNames = new HashSet<>(); - - for (IgfsConfiguration cfg : cfgs) { - String name = cfg.getName(); - - if (cfgNames.contains(name)) - throw new IgniteCheckedException("Duplicate GGFS name found (check configuration and " + - "assign unique name to each): " + name); - - GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName()); - - if (dataCache == null) - throw new IgniteCheckedException("Data cache is not configured locally for GGFS: " + cfg); - - if (dataCache.configuration().isQueryIndexEnabled()) - throw new IgniteCheckedException("GGFS data cache cannot start with enabled query indexing."); - - GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName()); - - if (metaCache == null) - throw new IgniteCheckedException("Metadata cache is not configured locally for GGFS: " + cfg); - - if (metaCache.configuration().isQueryIndexEnabled()) - throw new IgniteCheckedException("GGFS metadata cache cannot start with enabled query indexing."); - - if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName())) - throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName()); - - if (!(dataCache.configuration().getAffinityMapper() instanceof IgfsGroupDataBlocksKeyMapper)) - throw new IgniteCheckedException("Invalid GGFS data cache configuration (key affinity mapper class should be " + - IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg); - - long maxSpaceSize = cfg.getMaxSpaceSize(); - - if (maxSpaceSize > 0) { - // Max space validation. - long maxHeapSize = Runtime.getRuntime().maxMemory(); - long offHeapSize = dataCache.configuration().getOffHeapMaxMemory(); - - if (offHeapSize < 0 && maxSpaceSize > maxHeapSize) - // Offheap is disabled. - throw new IgniteCheckedException("Maximum GGFS space size cannot be greater that size of available heap " + - "memory [maxHeapSize=" + maxHeapSize + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); - else if (offHeapSize > 0 && maxSpaceSize > maxHeapSize + offHeapSize) - // Offheap is enabled, but limited. - throw new IgniteCheckedException("Maximum GGFS space size cannot be greater than size of available heap " + - "memory and offheap storage [maxHeapSize=" + maxHeapSize + ", offHeapSize=" + offHeapSize + - ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); - } - - if (dataCache.configuration().getCacheMode() == PARTITIONED) { - int backups = dataCache.configuration().getBackups(); - - if (backups != 0) - throw new IgniteCheckedException("GGFS data cache cannot be used with backups (set backup count " + - "to 0 and restart the grid): " + cfg.getDataCacheName()); - } - - if (cfg.getMaxSpaceSize() == 0 && dataCache.configuration().getMemoryMode() == OFFHEAP_VALUES) - U.warn(log, "GGFS max space size is not specified but data cache values are stored off-heap (max " + - "space will be limited to 80% of max JVM heap size): " + cfg.getName()); - - boolean secondary = cfg.getDefaultMode() == PROXY; - - if (cfg.getPathModes() != null) { - for (Map.Entry<String, IgfsMode> mode : cfg.getPathModes().entrySet()) { - if (mode.getValue() == PROXY) - secondary = true; - } - } - - if (secondary) { - // When working in any mode except of primary, secondary FS config must be provided. - assertParameter(cfg.getSecondaryFileSystem() != null, - "secondaryFileSystem cannot be null when mode is SECONDARY"); - } - - cfgNames.add(name); - } - } - - /** - * Check GGFS config on remote node. - * - * @param rmtNode Remote node. - * @throws IgniteCheckedException If check failed. - */ - private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException { - IgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(IgniteNodeAttributes.ATTR_GGFS); - IgfsAttributes[] rmtAttrs = rmtNode.attribute(IgniteNodeAttributes.ATTR_GGFS); - - if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) - return; - - assert rmtAttrs != null && locAttrs != null; - - for (IgfsAttributes rmtAttr : rmtAttrs) - for (IgfsAttributes locAttr : locAttrs) { - // Checking the use of different caches on the different GGFSes. - if (!F.eq(rmtAttr.ggfsName(), locAttr.ggfsName())) { - if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName())) - throw new IgniteCheckedException("Meta cache names should be different for different GGFS instances " + - "configuration (fix configuration or set " + - "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property) [metaCacheName=" + rmtAttr.metaCacheName() + - ", locNodeId=" + ctx.localNodeId() + - ", rmtNodeId=" + rmtNode.id() + - ", locGgfsName=" + locAttr.ggfsName() + - ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); - - if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName())) - throw new IgniteCheckedException("Data cache names should be different for different GGFS instances " + - "configuration (fix configuration or set " + - "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property)[dataCacheName=" + rmtAttr.dataCacheName() + - ", locNodeId=" + ctx.localNodeId() + - ", rmtNodeId=" + rmtNode.id() + - ", locGgfsName=" + locAttr.ggfsName() + - ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); - - continue; - } - - // Compare other attributes only for GGFSes with same name. - checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(), - locAttr.blockSize(), rmtAttr.ggfsName()); - - checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(), - locAttr.groupSize(), rmtAttr.ggfsName()); - - checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(), - locAttr.metaCacheName(), rmtAttr.ggfsName()); - - checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(), - locAttr.dataCacheName(), rmtAttr.ggfsName()); - - checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(), - locAttr.defaultMode(), rmtAttr.ggfsName()); - - checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(), - locAttr.pathModes(), rmtAttr.ggfsName()); - - checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(), - locAttr.fragmentizerEnabled(), rmtAttr.ggfsName()); - } - } - - private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName) - throws IgniteCheckedException { - if (!F.eq(rmtVal, locVal)) - throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " + - "(fix configuration or set " + - "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property ) [rmtNodeId=" + rmtNodeId + - ", rmt" + propName + "=" + rmtVal + - ", loc" + propName + "=" + locVal + - ", ggfName=" + ggfsName + ']'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessorAdapter.java deleted file mode 100644 index 8e372c4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsProcessorAdapter.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.ipc.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Ignite file system processor adapter. - */ -public abstract class IgfsProcessorAdapter extends GridProcessorAdapter { - /** - * Constructor. - * - * @param ctx Kernal context. - */ - protected IgfsProcessorAdapter(GridKernalContext ctx) { - super(ctx); - } - - /** - * Gets all GGFS instances. - * - * @return Collection of GGFS instances. - */ - public abstract Collection<IgniteFs> ggfss(); - - /** - * Gets GGFS instance. - * - * @param name (Nullable) GGFS name. - * @return GGFS instance. - */ - @Nullable public abstract IgniteFs ggfs(@Nullable String name); - - /** - * Gets server endpoints for particular GGFS. - * - * @param name GGFS name. - * @return Collection of endpoints or {@code null} in case GGFS is not defined. - */ - public abstract Collection<IpcServerEndpoint> endpoints(@Nullable String name); - - /** - * Create compute job for the given GGFS job. - * - * @param job GGFS job. - * @param ggfsName GGFS name. - * @param path Path. - * @param start Start position. - * @param length Length. - * @param recRslv Record resolver. - * @return Compute job. - */ - @Nullable public abstract ComputeJob createJob(IgfsJob job, @Nullable String ggfsName, IgfsPath path, - long start, long length, IgfsRecordResolver recRslv); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSamplingKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSamplingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSamplingKey.java deleted file mode 100644 index 6b0bdfe..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSamplingKey.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Internal key used to track if sampling enabled or disabled for particular GGFS instance. - */ -class IgfsSamplingKey implements GridCacheInternal, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS name. */ - private String name; - - /** - * Default constructor. - * - * @param name - GGFS name. - */ - IgfsSamplingKey(String name) { - this.name = name; - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public IgfsSamplingKey() { - // No-op. - } - - /** - * @return GGFS name. - */ - public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return name == null ? 0 : name.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return this == obj || (obj instanceof IgfsSamplingKey && F.eq(name, ((IgfsSamplingKey)obj).name)); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, name); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException { - name = U.readString(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsSamplingKey.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryInputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryInputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryInputStreamDescriptor.java deleted file mode 100644 index 56208fd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryInputStreamDescriptor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; - -/** - * Descriptor of an input stream opened to the secondary file system. - */ -public class IgfsSecondaryInputStreamDescriptor { - /** File info in the primary file system. */ - private final IgfsFileInfo info; - - /** Secondary file system input stream wrapper. */ - private final IgfsReader secReader; - - /** - * Constructor. - * - * @param info File info in the primary file system. - * @param secReader Secondary file system reader. - */ - IgfsSecondaryInputStreamDescriptor(IgfsFileInfo info, IgfsReader secReader) { - assert info != null; - assert secReader != null; - - this.info = info; - this.secReader = secReader; - } - - /** - * @return File info in the primary file system. - */ - IgfsFileInfo info() { - return info; - } - - /** - * @return Secondary file system reader. - */ - IgfsReader reader() { - return secReader; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryOutputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryOutputStreamDescriptor.java deleted file mode 100644 index e093fd0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSecondaryOutputStreamDescriptor.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.lang.*; - -import java.io.*; - -/** - * Descriptor of an output stream opened to the secondary file system. - */ -public class IgfsSecondaryOutputStreamDescriptor { - /** Parent ID in the primary file system. */ - private final IgniteUuid parentId; - - /** File info in the primary file system. */ - private final IgfsFileInfo info; - - /** Output stream to the secondary file system. */ - private final OutputStream out; - - /** - * Constructor. - * - * @param parentId Parent ID in the primary file system. - * @param info File info in the primary file system. - * @param out Output stream to the secondary file system. - */ - IgfsSecondaryOutputStreamDescriptor(IgniteUuid parentId, IgfsFileInfo info, OutputStream out) { - assert parentId != null; - assert info != null; - assert out != null; - - this.parentId = parentId; - this.info = info; - this.out = out; - } - - /** - * @return Parent ID in the primary file system. - */ - IgniteUuid parentId() { - return parentId; - } - - /** - * @return File info in the primary file system. - */ - IgfsFileInfo info() { - return info; - } - - /** - * @return Output stream to the secondary file system. - */ - OutputStream out() { - return out; - } -}