http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDeleteWorker.java deleted file mode 100644 index 9e57b2b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDeleteWorker.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.processors.fs.IgfsFileInfo.*; - -/** - * GGFS worker for removal from the trash directory. - */ -public class IgfsDeleteWorker extends IgfsThread { - /** Awake frequency, */ - private static final long FREQUENCY = 1000; - - /** How many files/folders to delete at once (i.e in a single transaction). */ - private static final int MAX_DELETE_BATCH = 100; - - /** GGFS context. */ - private final IgfsContext ggfsCtx; - - /** Metadata manager. */ - private final IgfsMetaManager meta; - - /** Data manager. */ - private final IgfsDataManager data; - - /** Event manager. */ - private final GridEventStorageManager evts; - - /** Logger. */ - private final IgniteLogger log; - - /** Lock. */ - private final Lock lock = new ReentrantLock(); - - /** Condition. */ - private final Condition cond = lock.newCondition(); - - /** Force worker to perform actual delete. */ - private boolean force; - - /** Cancellation flag. */ - private volatile boolean cancelled; - - /** Message topic. */ - private Object topic; - - /** - * Constructor. - * - * @param ggfsCtx GGFS context. - */ - IgfsDeleteWorker(IgfsContext ggfsCtx) { - super("ggfs-delete-worker%" + ggfsCtx.ggfs().name() + "%" + ggfsCtx.kernalContext().localNodeId() + "%"); - - this.ggfsCtx = ggfsCtx; - - meta = ggfsCtx.meta(); - data = ggfsCtx.data(); - - evts = ggfsCtx.kernalContext().event(); - - String ggfsName = ggfsCtx.ggfs().name(); - - topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName); - - assert meta != null; - assert data != null; - - log = ggfsCtx.kernalContext().log(IgfsDeleteWorker.class); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Delete worker started."); - - while (!cancelled) { - lock.lock(); - - try { - if (!cancelled && !force) - cond.await(FREQUENCY, TimeUnit.MILLISECONDS); - - force = false; // Reset force flag. - } - finally { - lock.unlock(); - } - - if (!cancelled) - delete(); - } - } - - /** - * Notify the worker that new entry to delete appeared. - */ - void signal() { - lock.lock(); - - try { - force = true; - - cond.signalAll(); - } - finally { - lock.unlock(); - } - } - - void cancel() { - cancelled = true; - - interrupt(); - } - - /** - * Perform cleanup of the trash directory. - */ - private void delete() { - IgfsFileInfo info = null; - - try { - info = meta.info(TRASH_ID); - } - catch (IgniteCheckedException e) { - U.error(log, "Cannot obtain trash directory info.", e); - } - - if (info != null) { - for (Map.Entry<String, IgfsListingEntry> entry : info.listing().entrySet()) { - IgniteUuid fileId = entry.getValue().fileId(); - - if (log.isDebugEnabled()) - log.debug("Deleting GGFS trash entry [name=" + entry.getKey() + ", fileId=" + fileId + ']'); - - try { - if (!cancelled) { - if (delete(entry.getKey(), fileId)) { - if (log.isDebugEnabled()) - log.debug("Sending delete confirmation message [name=" + entry.getKey() + - ", fileId=" + fileId + ']'); - - sendDeleteMessage(new IgfsDeleteMessage(fileId)); - } - } - else - break; - } - catch (IgniteInterruptedCheckedException ignored) { - // Ignore this exception while stopping. - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e); - - sendDeleteMessage(new IgfsDeleteMessage(fileId, e)); - } - } - } - } - - /** - * Remove particular entry from the TRASH directory. - * - * @param name Entry name. - * @param id Entry ID. - * @return {@code True} in case the entry really was deleted form the file system by this call. - * @throws IgniteCheckedException If failed. - */ - private boolean delete(String name, IgniteUuid id) throws IgniteCheckedException { - assert name != null; - assert id != null; - - while (true) { - IgfsFileInfo info = meta.info(id); - - if (info != null) { - if (info.isDirectory()) { - deleteDirectory(TRASH_ID, id); - - if (meta.delete(TRASH_ID, name, id)) - return true; - } - else { - assert info.isFile(); - - // Delete file content first. - // In case this node crashes, other node will re-delete the file. - data.delete(info).get(); - - boolean ret = meta.delete(TRASH_ID, name, id); - - if (evts.isRecordable(EVT_GGFS_FILE_PURGED)) { - if (info.path() != null) - evts.record(new IgfsEvent(info.path(), - ggfsCtx.kernalContext().discovery().localNode(), EVT_GGFS_FILE_PURGED)); - else - LT.warn(log, null, "Removing file without path info: " + info); - } - - return ret; - } - } - else - return false; // Entry was deleted concurrently. - } - } - - /** - * Remove particular entry from the trash directory or subdirectory. - * - * @param parentId Parent ID. - * @param id Entry id. - * @throws IgniteCheckedException If delete failed for some reason. - */ - private void deleteDirectory(IgniteUuid parentId, IgniteUuid id) throws IgniteCheckedException { - assert parentId != null; - assert id != null; - - while (true) { - IgfsFileInfo info = meta.info(id); - - if (info != null) { - assert info.isDirectory(); - - Map<String, IgfsListingEntry> listing = info.listing(); - - if (listing.isEmpty()) - return; // Directory is empty. - - Map<String, IgfsListingEntry> delListing; - - if (listing.size() <= MAX_DELETE_BATCH) - delListing = listing; - else { - delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f); - - int i = 0; - - for (Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) { - delListing.put(entry.getKey(), entry.getValue()); - - if (++i == MAX_DELETE_BATCH) - break; - } - } - - GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(ggfsCtx.kernalContext()); - - // Delegate to child folders. - for (IgfsListingEntry entry : delListing.values()) { - if (!cancelled) { - if (entry.isDirectory()) - deleteDirectory(id, entry.fileId()); - else { - IgfsFileInfo fileInfo = meta.info(entry.fileId()); - - if (fileInfo != null) { - assert fileInfo.isFile(); - - fut.add(data.delete(fileInfo)); - } - } - } - else - return; - } - - fut.markInitialized(); - - // Wait for data cache to delete values before clearing meta cache. - try { - fut.get(); - } - catch (IgniteFutureCancelledCheckedException ignore) { - // This future can be cancelled only due to GGFS shutdown. - cancelled = true; - - return; - } - - // Actual delete of folder content. - Collection<IgniteUuid> delIds = meta.delete(id, delListing); - - if (delListing == listing && delListing.size() == delIds.size()) - break; // All entries were deleted. - } - else - break; // Entry was deleted concurrently. - } - } - - /** - * Send delete message to all meta cache nodes in the grid. - * - * @param msg Message to send. - */ - private void sendDeleteMessage(IgfsDeleteMessage msg) { - assert msg != null; - - Collection<ClusterNode> nodes = meta.metaCacheNodes(); - - for (ClusterNode node : nodes) { - try { - ggfsCtx.send(node, topic, msg, GridIoPolicy.SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send GGFS delete message to node [nodeId=" + node.id() + - ", msg=" + msg + ", err=" + e.getMessage() + ']'); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDirectoryNotEmptyException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDirectoryNotEmptyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDirectoryNotEmptyException.java deleted file mode 100644 index d568334..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsDirectoryNotEmptyException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; - -/** - * Exception indicating that directory can not be deleted because it is not empty. - */ -public class IgfsDirectoryNotEmptyException extends IgfsException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Exception message. - */ - public IgfsDirectoryNotEmptyException(String msg) { - super(msg); - } - - /** - * Creates an instance of GGFS exception caused by nested exception. - * - * @param cause Exception cause. - */ - public IgfsDirectoryNotEmptyException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsEx.java deleted file mode 100644 index 1d16f01..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsEx.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.net.*; - -/** - * Internal API extension for {@link org.apache.ignite.IgniteFs}. - */ -public interface IgfsEx extends IgniteFs { - /** - * Stops GGFS cleaning all used resources. - */ - public void stop(); - - /** - * @return GGFS context. - */ - public IgfsContext context(); - - /** - * Get handshake message. - * - * @return Handshake message. - */ - public IgfsPaths proxyPaths(); - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) - throws IgniteException; - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException; - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException; - - /** - * Gets global space counters. - * - * @return Tuple in which first component is used space on all nodes, - * second is available space on all nodes. - * @throws IgniteCheckedException If task execution failed. - */ - public IgfsStatus globalSpace() throws IgniteCheckedException; - - /** - * Enables, disables or clears sampling flag. - * - * @param val {@code True} to turn on sampling, {@code false} to turn it off, {@code null} to clear sampling state. - * @throws IgniteCheckedException If failed. - */ - public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException; - - /** - * Get sampling state. - * - * @return {@code True} in case sampling is enabled, {@code false} otherwise, or {@code null} in case sampling - * flag is not set. - */ - @Nullable public Boolean globalSampling(); - - /** - * Get local metrics. - * - * @return Local metrics. - */ - public IgfsLocalMetrics localMetrics(); - - /** - * Gets group block size, i.e. block size multiplied by group size in affinity mapper. - * - * @return Group block size. - */ - public long groupBlockSize(); - - /** - * Asynchronously await for all entries existing in trash to be removed. - * - * @return Future which will be completed when all entries existed in trash by the time of invocation are removed. - * @throws IgniteCheckedException If failed. - */ - public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException; - - /** - * Gets client file system log directory. - * - * @return Client file system log directory or {@code null} in case no client connections have been created yet. - */ - @Nullable public String clientLogDirectory(); - - /** - * Sets client file system log directory. - * - * @param logDir Client file system log directory. - */ - public void clientLogDirectory(String logDir); - - /** - * Whether this path is excluded from evictions. - * - * @param path Path. - * @param primary Whether the mode is PRIMARY. - * @return {@code True} if path is excluded from evictions. - */ - public boolean evictExclude(IgfsPath path, boolean primary); - - /** - * Get next affinity key. - * - * @return Next affinity key. - */ - public IgniteUuid nextAffinityKey(); - - /** - * Check whether the given path is proxy path. - * - * @param path Path. - * @return {@code True} if proxy. - */ - public boolean isProxy(URI path); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileAffinityRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileAffinityRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileAffinityRange.java deleted file mode 100644 index 62238c9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileAffinityRange.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Affinity range. - */ -public class IgfsFileAffinityRange extends MessageAdapter implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Initial range status, right after creation. */ - public static final int RANGE_STATUS_INITIAL = 0; - - /** Moving range state. Fragmentizer started blocks copy. */ - public static final int RANGE_STATUS_MOVING = 1; - - /** Fragmentizer finished block copy for this range. */ - public static final int RANGE_STATUS_MOVED = 2; - - /** Range affinity key. */ - private IgniteUuid affKey; - - /** {@code True} if currently being moved by fragmentizer. */ - @SuppressWarnings("RedundantFieldInitialization") - private int status = RANGE_STATUS_INITIAL; - - /** Range start offset (divisible by block size). */ - private long startOff; - - /** Range end offset (endOff + 1 divisible by block size). */ - private long endOff; - - /** Transient flag indicating no further writes should be made to this range. */ - private boolean done; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgfsFileAffinityRange() { - // No-op. - } - - /** - * @param startOff Start offset. - * @param endOff End offset. - * @param affKey Affinity key. - */ - IgfsFileAffinityRange(long startOff, long endOff, IgniteUuid affKey) { - this.startOff = startOff; - this.endOff = endOff; - this.affKey = affKey; - } - - /** - * Creates new range with updated status. - * - * @param other Initial range. - * @param status Updated status. - */ - IgfsFileAffinityRange(IgfsFileAffinityRange other, int status) { - startOff = other.startOff; - endOff = other.endOff; - affKey = other.affKey; - - this.status = status; - } - - /** - * @return Affinity key for this range. - */ - public IgniteUuid affinityKey() { - return affKey; - } - - /** - * @return Range start offset. - */ - public long startOffset() { - return startOff; - } - - /** - * @return Range end offset. - */ - public long endOffset() { - return endOff; - } - - /** - * @param blockStartOff Block start offset to check. - * @return {@code True} if block with given start offset belongs to this range. - */ - public boolean belongs(long blockStartOff) { - return blockStartOff >= startOff && blockStartOff < endOff; - } - - /** - * @param blockStartOff Block start offset to check. - * @return {@code True} if block with given start offset is located before this range. - */ - public boolean less(long blockStartOff) { - return blockStartOff < startOff; - } - - /** - * @param blockStartOff Block start offset to check. - * @return {@code True} if block with given start offset is located after this range. - */ - public boolean greater(long blockStartOff) { - return blockStartOff > endOff; - } - - /** - * @return If range is empty, i.e. has zero length. - */ - public boolean empty() { - return startOff == endOff; - } - - /** - * @return Range status. - */ - public int status() { - return status; - } - - /** - * Expands this range by given block. - * - * @param blockStartOff Offset of block start. - * @param expansionSize Block size. - */ - public void expand(long blockStartOff, int expansionSize) { - // If we are expanding empty range. - if (endOff == startOff) { - assert endOff == blockStartOff : "Failed to expand range [endOff=" + endOff + - ", blockStartOff=" + blockStartOff + ", expansionSize=" + expansionSize + ']'; - - endOff += expansionSize - 1; - } - else { - assert endOff == blockStartOff - 1; - - endOff += expansionSize; - } - } - - /** - * Splits range into collection if smaller ranges with length equal to {@code maxSize}. - * - * @param maxSize Split part maximum size. - * @return Collection of range parts. - */ - public Collection<IgfsFileAffinityRange> split(long maxSize) { - long len = endOff - startOff + 1; - - if (len > maxSize) { - int size = (int)(len / maxSize + 1); - - Collection<IgfsFileAffinityRange> res = new ArrayList<>(size); - - long pos = startOff; - - while (pos < endOff + 1) { - long end = Math.min(pos + maxSize - 1, endOff); - - IgfsFileAffinityRange part = new IgfsFileAffinityRange(pos, end, affKey); - - part.status = status; - - res.add(part); - - pos = end + 1; - } - - return res; - } - else - return Collections.singletonList(this); - } - - /** - * Tries to concatenate this range with a given one. If ranges are not adjacent, will return {@code null}. - * - * @param range Range to concatenate with. - * @return Concatenation result or {@code null} if ranges are not adjacent. - */ - @Nullable public IgfsFileAffinityRange concat(IgfsFileAffinityRange range) { - if (endOff + 1 != range.startOff || !F.eq(affKey, range.affKey) || status != RANGE_STATUS_INITIAL) - return null; - - return new IgfsFileAffinityRange(startOff, range.endOff, affKey); - } - - /** - * Marks this range as done. - */ - public void markDone() { - done = true; - } - - /** - * @return Done flag. - */ - public boolean done() { - return done; - } - - /** - * Checks if range regions are equal. - * - * @param other Other range to check against. - * @return {@code True} if range regions are equal. - */ - public boolean regionEqual(IgfsFileAffinityRange other) { - return startOff == other.startOff && endOff == other.endOff; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, affKey); - - out.writeInt(status); - - out.writeLong(startOff); - out.writeLong(endOff); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - affKey = U.readGridUuid(in); - - status = in.readInt(); - - startOff = in.readLong(); - endOff = in.readLong(); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public MessageAdapter clone() { - IgfsFileAffinityRange _clone = new IgfsFileAffinityRange(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(MessageAdapter _msg) { - IgfsFileAffinityRange _clone = (IgfsFileAffinityRange)_msg; - - _clone.affKey = affKey; - _clone.status = status; - _clone.startOff = startOff; - _clone.endOff = endOff; - _clone.done = done; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean writeTo(ByteBuffer buf) { - writer.setBuffer(buf); - - if (!typeWritten) { - if (!writer.writeByte(null, directType())) - return false; - - typeWritten = true; - } - - switch (state) { - case 0: - if (!writer.writeIgniteUuid("affKey", affKey)) - return false; - - state++; - - case 1: - if (!writer.writeBoolean("done", done)) - return false; - - state++; - - case 2: - if (!writer.writeLong("endOff", endOff)) - return false; - - state++; - - case 3: - if (!writer.writeLong("startOff", startOff)) - return false; - - state++; - - case 4: - if (!writer.writeInt("status", status)) - return false; - - state++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean readFrom(ByteBuffer buf) { - reader.setBuffer(buf); - - switch (state) { - case 0: - affKey = reader.readIgniteUuid("affKey"); - - if (!reader.isLastRead()) - return false; - - state++; - - case 1: - done = reader.readBoolean("done"); - - if (!reader.isLastRead()) - return false; - - state++; - - case 2: - endOff = reader.readLong("endOff"); - - if (!reader.isLastRead()) - return false; - - state++; - - case 3: - startOff = reader.readLong("startOff"); - - if (!reader.isLastRead()) - return false; - - state++; - - case 4: - status = reader.readInt("status"); - - if (!reader.isLastRead()) - return false; - - state++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 68; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsFileAffinityRange.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileImpl.java deleted file mode 100644 index 4d5da40..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileImpl.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * File or directory information. - */ -public final class IgfsFileImpl implements IgfsFile, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Path to this file. */ - private IgfsPath path; - - /** File id. */ - private IgniteUuid fileId; - - /** Block size. */ - private int blockSize; - - /** Group block size. */ - private long grpBlockSize; - - /** File length. */ - private long len; - - /** Last access time. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** Properties. */ - private Map<String, String> props; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgfsFileImpl() { - // No-op. - } - - /** - * Constructs directory info. - * - * @param path Path. - */ - public IgfsFileImpl(IgfsPath path, IgfsFileInfo info, long globalGrpBlockSize) { - A.notNull(path, "path"); - A.notNull(info, "info"); - - this.path = path; - fileId = info.id(); - - if (info.isFile()) { - blockSize = info.blockSize(); - len = info.length(); - - grpBlockSize = info.affinityKey() == null ? globalGrpBlockSize : - info.length() == 0 ? globalGrpBlockSize : info.length(); - } - - props = info.properties(); - - if (props == null) - props = Collections.emptyMap(); - - accessTime = info.accessTime(); - modificationTime = info.modificationTime(); - } - - /** - * Constructs file instance. - * - * @param path Path. - * @param entry Listing entry. - */ - public IgfsFileImpl(IgfsPath path, IgfsListingEntry entry, long globalGrpSize) { - A.notNull(path, "path"); - A.notNull(entry, "entry"); - - this.path = path; - fileId = entry.fileId(); - - blockSize = entry.blockSize(); - - grpBlockSize = entry.affinityKey() == null ? globalGrpSize : - entry.length() == 0 ? globalGrpSize : entry.length(); - - len = entry.length(); - props = entry.properties(); - - accessTime = entry.accessTime(); - modificationTime = entry.modificationTime(); - } - - /** {@inheritDoc} */ - @Override public IgfsPath path() { - return path; - } - - /** - * @return File ID. - */ - public IgniteUuid fileId() { - return fileId; - } - - /** {@inheritDoc} */ - @Override public boolean isFile() { - return blockSize > 0; - } - - /** {@inheritDoc} */ - @Override public boolean isDirectory() { - return blockSize == 0; - } - - /** {@inheritDoc} */ - @Override public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public int blockSize() { - return blockSize; - } - - /** {@inheritDoc} */ - @Override public long groupBlockSize() { - return grpBlockSize; - } - - /** {@inheritDoc} */ - @Override public long accessTime() { - return accessTime; - } - - /** {@inheritDoc} */ - @Override public long modificationTime() { - return modificationTime; - } - - /** {@inheritDoc} */ - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - /** {@inheritDoc} */ - @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return props; - } - - /** - * Writes object to data output. - * - * @param out Data output. - */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - path.writeExternal(out); - - out.writeInt(blockSize); - out.writeLong(grpBlockSize); - out.writeLong(len); - U.writeStringMap(out, props); - out.writeLong(accessTime); - out.writeLong(modificationTime); - } - - /** - * Reads object from data input. - * - * @param in Data input. - */ - @Override public void readExternal(ObjectInput in) throws IOException { - path = new IgfsPath(); - - path.readExternal(in); - - blockSize = in.readInt(); - grpBlockSize = in.readLong(); - len = in.readLong(); - props = U.readStringMap(in); - accessTime = in.readLong(); - modificationTime = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return path.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - IgfsFileImpl that = (IgfsFileImpl)o; - - return path.equals(that.path); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsFileImpl.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileInfo.java deleted file mode 100644 index f4956a5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileInfo.java +++ /dev/null @@ -1,569 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Unmodifiable file information. - */ -public final class IgfsFileInfo implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** ID for the root directory. */ - public static final IgniteUuid ROOT_ID = new IgniteUuid(new UUID(0, 0), 0); - - /** ID of the trash directory. */ - public static final IgniteUuid TRASH_ID = new IgniteUuid(new UUID(0, 1), 0); - - /** Info ID. */ - private IgniteUuid id; - - /** File length in bytes. */ - private long len; - - /** File block size, {@code zero} for directories. */ - private int blockSize; - - /** File properties. */ - private Map<String, String> props; - - /** File lock ID. */ - private IgniteUuid lockId; - - /** Affinity key used for single-node file collocation. */ - private IgniteUuid affKey; - - /** File affinity map. */ - private IgfsFileMap fileMap; - - /** Last access time. Modified on-demand. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** Directory listing. */ - @GridToStringInclude - private Map<String, IgfsListingEntry> listing; - - /** Whether data blocks of this entry should never be excluded. */ - private boolean evictExclude; - - /** - * Original file path. This is a helper field used only in some - * operations like delete. - */ - private IgfsPath path; - - /** - * {@link Externalizable} support. - */ - public IgfsFileInfo() { - this(ROOT_ID); - } - - /** - * Constructs directory file info with the given ID. - * - * @param id ID. - */ - IgfsFileInfo(IgniteUuid id) { - this(true, id, 0, 0, null, null, null, null, false, System.currentTimeMillis(), false); - } - - /** - * Constructs directory or file info with {@link org.apache.ignite.configuration.IgfsConfiguration#DFLT_BLOCK_SIZE default} block size. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param props Meta properties to set. - */ - public IgfsFileInfo(boolean isDir, @Nullable Map<String, String> props) { - this(isDir, null, isDir ? 0 : IgfsConfiguration.DFLT_BLOCK_SIZE, 0, null, null, props, null, false, - System.currentTimeMillis(), false); - } - - /** - * Consturcts directory with random ID and provided listing. - * - * @param listing Listing. - */ - IgfsFileInfo(Map<String, IgfsListingEntry> listing) { - this(true, null, 0, 0, null, listing, null, null, false, System.currentTimeMillis(), false); - } - - /** - * Constructs file info. - * - * @param blockSize Block size. - * @param affKey Affinity key. - * @param evictExclude Eviction exclude flag. - * @param props File properties. - */ - IgfsFileInfo(int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude, - @Nullable Map<String, String> props) { - this(false, null, blockSize, 0, affKey, null, props, null, true, System.currentTimeMillis(), evictExclude); - } - - /** - * Constructs file info. - * - * @param blockSize Block size. - * @param len Length. - * @param affKey Affinity key. - * @param lockId Lock ID. - * @param props Properties. - * @param evictExclude Evict exclude flag. - */ - public IgfsFileInfo(int blockSize, long len, @Nullable IgniteUuid affKey, @Nullable IgniteUuid lockId, - boolean evictExclude, @Nullable Map<String, String> props) { - this(false, null, blockSize, len, affKey, null, props, lockId, true, System.currentTimeMillis(), evictExclude); - } - - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param len Size of a file. - */ - IgfsFileInfo(IgfsFileInfo info, long len) { - this(info.isDirectory(), info.id, info.blockSize, len, info.affKey, info.listing, info.props, info.fileMap(), - info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Constructs file info. - * - * @param info File info. - * @param accessTime Last access time. - * @param modificationTime Last modification time. - */ - IgfsFileInfo(IgfsFileInfo info, long accessTime, long modificationTime) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), info.lockId, false, accessTime, modificationTime, info.evictExclude()); - } - - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param props File properties to set. - */ - IgfsFileInfo(IgfsFileInfo info, @Nullable Map<String, String> props) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, props, - info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Constructs file info. - * - * @param blockSize Block size, - * @param len Size of a file. - * @param props File properties to set. - * @param evictExclude Evict exclude flag. - */ - IgfsFileInfo(int blockSize, long len, boolean evictExclude, @Nullable Map<String, String> props) { - this(false, null, blockSize, len, null, null, props, null, true, System.currentTimeMillis(), evictExclude); - } - - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param lockId Lock ID. - * @param modificationTime Last modification time. - */ - IgfsFileInfo(IgfsFileInfo info, @Nullable IgniteUuid lockId, long modificationTime) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), lockId, true, info.accessTime, modificationTime, info.evictExclude()); - } - - /** - * Constructs file info. - * - * @param listing New directory listing. - * @param old Old file info. - */ - IgfsFileInfo(Map<String, IgfsListingEntry> listing, IgfsFileInfo old) { - this(old.isDirectory(), old.id, old.blockSize, old.len, old.affKey, listing, old.props, old.fileMap(), - old.lockId, false, old.accessTime, old.modificationTime, old.evictExclude()); - } - - /** - * Constructs file info. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param id ID or {@code null} to generate it automatically. - * @param blockSize Block size. - * @param len Size of a file. - * @param affKey Affinity key for data blocks. - * @param listing Directory listing. - * @param props File properties. - * @param lockId Lock ID. - * @param cpProps Flag to copy properties map. - * @param modificationTime Last modification time. - * @param evictExclude Evict exclude flag. - */ - private IgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, - @Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String, String> props, - @Nullable IgniteUuid lockId, boolean cpProps, long modificationTime, boolean evictExclude) { - this(isDir, id, blockSize, len, affKey, listing, props, null, lockId, cpProps, modificationTime, - modificationTime, evictExclude); - } - - /** - * Constructs file info. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param id ID or {@code null} to generate it automatically. - * @param blockSize Block size. - * @param len Size of a file. - * @param affKey Affinity key for data blocks. - * @param listing Directory listing. - * @param props File properties. - * @param fileMap File map. - * @param lockId Lock ID. - * @param cpProps Flag to copy properties map. - * @param accessTime Last access time. - * @param modificationTime Last modification time. - * @param evictExclude Evict exclude flag. - */ - private IgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, - @Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String, String> props, - @Nullable IgfsFileMap fileMap, @Nullable IgniteUuid lockId, boolean cpProps, long accessTime, - long modificationTime, boolean evictExclude) { - assert F.isEmpty(listing) || isDir; - - if (isDir) { - assert len == 0 : "Directory length should be zero: " + len; - assert blockSize == 0 : "Directory block size should be zero: " + blockSize; - } - else { - assert len >= 0 : "File length cannot be negative: " + len; - assert blockSize > 0 : "File block size should be positive: " + blockSize; - } - - this.id = id == null ? IgniteUuid.randomUuid() : id; - this.len = isDir ? 0 : len; - this.blockSize = isDir ? 0 : blockSize; - this.affKey = affKey; - this.listing = listing; - - if (fileMap == null && !isDir) - fileMap = new IgfsFileMap(); - - this.fileMap = fileMap; - this.accessTime = accessTime; - this.modificationTime = modificationTime; - - // Always make a copy of passed properties collection to escape concurrent modifications. - this.props = props == null || props.isEmpty() ? null : - cpProps ? new GridLeanMap<>(props) : props; - - if (listing == null && isDir) - this.listing = Collections.emptyMap(); - - this.lockId = lockId; - this.evictExclude = evictExclude; - } - - /** - * A copy constructor, which takes all data from the specified - * object field-by-field. - * - * @param info An object to copy data info. - */ - public IgfsFileInfo(IgfsFileInfo info) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Creates a builder for the new instance of file info. - * - * @return A builder to construct a new unmodifiable instance - * of this class. - */ - public static Builder builder() { - return new Builder(new IgfsFileInfo()); - } - - /** - * Creates a builder for the new instance of file info, - * based on the specified origin. - * - * @param origin An origin for new instance, from which - * the data will be copied. - * @return A builder to construct a new unmodifiable instance - * of this class. - */ - public static Builder builder(IgfsFileInfo origin) { - return new Builder(new IgfsFileInfo(origin)); - } - - /** - * Gets this item ID. - * - * @return This item ID. - */ - public IgniteUuid id() { - return id; - } - - /** - * @return {@code True} if this is a file. - */ - public boolean isFile() { - return blockSize > 0; - } - - /** - * @return {@code True} if this is a directory. - */ - public boolean isDirectory() { - return blockSize == 0; - } - - /** - * Get file size. - * - * @return File size. - */ - public long length() { - assert isFile(); - - return len; - } - - /** - * Get single data block size to store this file. - * - * @return Single data block size to store this file. - */ - public int blockSize() { - assert isFile(); - - return blockSize; - } - - /** - * @return Number of data blocks to store this file. - */ - public long blocksCount() { - assert isFile(); - - return (len + blockSize() - 1) / blockSize(); - } - - /** - * @return Last access time. - */ - public long accessTime() { - return accessTime; - } - - /** - * @return Last modification time. - */ - public long modificationTime() { - return modificationTime; - } - - /** - * @return Directory listing. - */ - public Map<String, IgfsListingEntry> listing() { - // Always wrap into unmodifiable map to be able to avoid illegal modifications in order pieces of the code. - if (isFile()) - return Collections.unmodifiableMap(Collections.<String, IgfsListingEntry>emptyMap()); - - assert listing != null; - - return Collections.unmodifiableMap(listing); - } - - /** - * @return Affinity key used for single-node file collocation. If {@code null}, usual - * mapper procedure is used for block affinity detection. - */ - @Nullable public IgniteUuid affinityKey() { - return affKey; - } - - /** - * @param affKey Affinity key used for single-node file collocation. - */ - public void affinityKey(IgniteUuid affKey) { - this.affKey = affKey; - } - - /** - * @return File affinity map. - */ - public IgfsFileMap fileMap() { - return fileMap; - } - - /** - * @param fileMap File affinity map. - */ - public void fileMap(IgfsFileMap fileMap) { - this.fileMap = fileMap; - } - - /** - * Get properties of the file. - * - * @return Properties of the file. - */ - public Map<String, String> properties() { - return props == null || props.isEmpty() ? Collections.<String, String>emptyMap() : - Collections.unmodifiableMap(props); - } - - /** - * Get lock ID. - * - * @return Lock ID if file is locked or {@code null} if file is free of locks. - */ - @Nullable public IgniteUuid lockId() { - return lockId; - } - - /** - * Get evict exclude flag. - * - * @return Evict exclude flag. - */ - public boolean evictExclude() { - return evictExclude; - } - - /** - * @return Original file path. This is a helper field used only in some operations like delete. - */ - public IgfsPath path() { - return path; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, id); - out.writeInt(blockSize); - out.writeLong(len); - U.writeStringMap(out, props); - U.writeGridUuid(out, lockId); - U.writeGridUuid(out, affKey); - out.writeObject(listing); - out.writeObject(fileMap); - out.writeLong(accessTime); - out.writeLong(modificationTime); - out.writeBoolean(evictExclude); - out.writeObject(path); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readGridUuid(in); - blockSize = in.readInt(); - len = in.readLong(); - props = U.readStringMap(in); - lockId = U.readGridUuid(in); - affKey = U.readGridUuid(in); - listing = (Map<String, IgfsListingEntry>)in.readObject(); - fileMap = (IgfsFileMap)in.readObject(); - accessTime = in.readLong(); - modificationTime = in.readLong(); - evictExclude = in.readBoolean(); - path = (IgfsPath)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id.hashCode() ^ blockSize ^ (int)(len ^ (len >>> 32)) ^ (props == null ? 0 : props.hashCode()) ^ - (lockId == null ? 0 : lockId.hashCode()); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - IgfsFileInfo that = (IgfsFileInfo)obj; - - return id.equals(that.id) && blockSize == that.blockSize && len == that.len && F.eq(affKey, that.affKey) && - F.eq(props, that.props) && F.eq(lockId, that.lockId); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsFileInfo.class, this); - } - - /** - * Builder for {@link IgfsFileInfo}. - */ - @SuppressWarnings("PublicInnerClass") - public static class Builder { - /** Instance to build. */ - private final IgfsFileInfo info; - - /** - * Private constructor. - * - * @param info Instance to build. - */ - private Builder(IgfsFileInfo info) { - this.info = info; - } - - /** - * @param path A new path value. - * @return This builder instance (for chaining). - */ - public Builder path(IgfsPath path) { - info.path = path; - - return this; - } - - /** - * Finishes instance construction and returns a resulting - * unmodifiable instance. - * - * @return A constructed instance. - */ - public IgfsFileInfo build() { - return info; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileMap.java deleted file mode 100644 index 815f023..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileMap.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.fs.IgfsFileAffinityRange.*; - -/** - * Auxiliary class that is responsible for managing file affinity keys allocation by ranges. - */ -public class IgfsFileMap implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - @GridToStringInclude - /** Sorted list of ranges in ascending order. */ - private List<IgfsFileAffinityRange> ranges; - - /** - * Empty constructor. - */ - public IgfsFileMap() { - // No-op. - } - - /** - * Constructs same file map as passed in. - * - * @param old Old map. - */ - public IgfsFileMap(@Nullable IgfsFileMap old) { - if (old != null && old.ranges != null) { - ranges = new ArrayList<>(old.ranges.size()); - - ranges.addAll(old.ranges); - } - } - - /** - * Gets affinity key from file map based on block start offset. - * - * @param blockOff Block start offset (divisible by block size). - * @param includeMoved If {@code true} then will return affinity key for ranges marked as moved. - * Otherwise will return null for such ranges. - * @return Affinity key. - */ - public IgniteUuid affinityKey(long blockOff, boolean includeMoved) { - if (ranges == null) - return null; - - assert !ranges.isEmpty(); - - // Range binary search. - int leftIdx = 0, rightIdx = ranges.size() - 1; - - IgfsFileAffinityRange leftRange = ranges.get(leftIdx); - IgfsFileAffinityRange rightRange = ranges.get(rightIdx); - - // If block offset is less than start of first range, we don't have affinity key. - if (leftRange.less(blockOff)) - return null; - - if (leftRange.belongs(blockOff)) - return leftRange.status() != RANGE_STATUS_MOVED ? leftRange.affinityKey() : - includeMoved ? leftRange.affinityKey() : null; - - if (rightRange.greater(blockOff)) - return null; - - if (rightRange.belongs(blockOff)) - return rightRange.status() != RANGE_STATUS_MOVED ? rightRange.affinityKey() : - includeMoved ? leftRange.affinityKey() : null; - - while (rightIdx - leftIdx > 1) { - int midIdx = (leftIdx + rightIdx) / 2; - - IgfsFileAffinityRange midRange = ranges.get(midIdx); - - if (midRange.belongs(blockOff)) - return midRange.status() != RANGE_STATUS_MOVED ? midRange.affinityKey() : - includeMoved ? leftRange.affinityKey() : null; - - // If offset is less then block start, update right index. - if (midRange.less(blockOff)) - rightIdx = midIdx; - else { - assert midRange.greater(blockOff); - - leftIdx = midIdx; - } - } - - // Range was not found. - return null; - } - - /** - * Updates range status in file map. Will split range into two ranges if given range is a sub-range starting - * from the same offset. - * - * @param range Range to update status. - * @param status New range status. - * @throws IgniteCheckedException If range was not found. - */ - public void updateRangeStatus(IgfsFileAffinityRange range, int status) throws IgniteCheckedException { - if (ranges == null) - throw new IgfsInvalidRangeException("Failed to update range status (file map is empty) " + - "[range=" + range + ", ranges=" + ranges + ']'); - - assert !ranges.isEmpty(); - - // Check last. - int lastIdx = ranges.size() - 1; - - IgfsFileAffinityRange last = ranges.get(lastIdx); - - if (last.startOffset() == range.startOffset()) { - updateRangeStatus0(lastIdx, last, range, status); - - return; - } - - // Check first. - int firstIdx = 0; - - IgfsFileAffinityRange first = ranges.get(firstIdx); - - if (first.startOffset() == range.startOffset()) { - updateRangeStatus0(firstIdx, first, range, status); - - return; - } - - // Binary search. - while (lastIdx - firstIdx > 1) { - int midIdx = (firstIdx + lastIdx) / 2; - - IgfsFileAffinityRange midRange = ranges.get(midIdx); - - if (midRange.startOffset() == range.startOffset()) { - updateRangeStatus0(midIdx, midRange, range, status); - - return; - } - - // If range we are looking for is less - if (midRange.less(range.startOffset())) - lastIdx = midIdx; - else { - assert midRange.greater(range.startOffset()); - - firstIdx = midIdx; - } - } - - throw new IgfsInvalidRangeException("Failed to update map for range (corresponding map range " + - "was not found) [range=" + range + ", status=" + status + ", ranges=" + ranges + ']'); - } - - /** - * Deletes range from map. - * - * @param range Range to delete. - */ - public void deleteRange(IgfsFileAffinityRange range) throws IgniteCheckedException { - if (ranges == null) - throw new IgfsInvalidRangeException("Failed to remove range (file map is empty) " + - "[range=" + range + ", ranges=" + ranges + ']'); - - assert !ranges.isEmpty(); - - try { - // Check last. - int lastIdx = ranges.size() - 1; - - IgfsFileAffinityRange last = ranges.get(lastIdx); - - if (last.regionEqual(range)) { - assert last.status() == RANGE_STATUS_MOVED; - - ranges.remove(last); - - return; - } - - // Check first. - int firstIdx = 0; - - IgfsFileAffinityRange first = ranges.get(firstIdx); - - if (first.regionEqual(range)) { - assert first.status() == RANGE_STATUS_MOVED; - - ranges.remove(first); - - return; - } - - // Binary search. - while (lastIdx - firstIdx > 1) { - int midIdx = (firstIdx + lastIdx) / 2; - - IgfsFileAffinityRange midRange = ranges.get(midIdx); - - if (midRange.regionEqual(range)) { - assert midRange.status() == RANGE_STATUS_MOVED; - - ranges.remove(midIdx); - - return; - } - - // If range we are looking for is less - if (midRange.less(range.startOffset())) - lastIdx = midIdx; - else { - assert midRange.greater(range.startOffset()); - - firstIdx = midIdx; - } - } - } - finally { - if (ranges.isEmpty()) - ranges = null; - } - - throw new IgfsInvalidRangeException("Failed to remove range from file map (corresponding map range " + - "was not found) [range=" + range + ", ranges=" + ranges + ']'); - } - - /** - * Updates range status at given position (will split range into two if necessary). - * - * @param origIdx Original range index. - * @param orig Original range at index. - * @param update Range being updated. - * @param status New status for range. - */ - private void updateRangeStatus0(int origIdx, IgfsFileAffinityRange orig, IgfsFileAffinityRange update, - int status) { - assert F.eq(orig.affinityKey(), update.affinityKey()); - assert ranges.get(origIdx) == orig; - - if (orig.regionEqual(update)) - ranges.set(origIdx, new IgfsFileAffinityRange(update, status)); - else { - // If range was expanded, new one should be larger. - assert orig.endOffset() > update.endOffset(); - - ranges.set(origIdx, new IgfsFileAffinityRange(update, status)); - ranges.add(origIdx + 1, new IgfsFileAffinityRange(update.endOffset() + 1, orig.endOffset(), - orig.affinityKey())); - } - } - - /** - * Gets full list of ranges present in this map. - * - * @return Unmodifiable list of ranges. - */ - public List<IgfsFileAffinityRange> ranges() { - if (ranges == null) - return Collections.emptyList(); - - return Collections.unmodifiableList(ranges); - } - - /** - * Adds range to the list of already existing ranges. Added range must be located after - * the last range in this map. If added range is adjacent to the last range in the map, - * added range will be concatenated to the last one. - * - * @param range Range to add. - */ - public void addRange(IgfsFileAffinityRange range) { - if (range == null || range.empty()) - return; - - // We cannot add range in the middle of the file. - if (ranges == null) { - ranges = new ArrayList<>(); - - ranges.add(range); - - return; - } - - assert !ranges.isEmpty(); - - IgfsFileAffinityRange last = ranges.get(ranges.size() - 1); - - // Ensure that range being added is located to the right of last range in list. - assert last.greater(range.startOffset()) : "Cannot add range to middle of map [last=" + last + - ", range=" + range + ']'; - - // Try to concat last and new range. - IgfsFileAffinityRange concat = last.concat(range); - - // Simply add range to the end of the list if they are not adjacent. - if (concat == null) - ranges.add(range); - else - ranges.set(ranges.size() - 1, concat); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - if (ranges == null) - out.writeInt(-1); - else { - assert !ranges.isEmpty(); - - out.writeInt(ranges.size()); - - for (IgfsFileAffinityRange range : ranges) - out.writeObject(range); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int size = in.readInt(); - - if (size > 0) { - ranges = new ArrayList<>(size); - - for (int i = 0; i < size; i++) - ranges.add((IgfsFileAffinityRange)in.readObject()); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsFileMap.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorker.java deleted file mode 100644 index 5d633ee..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorker.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -/** - * GGFS file worker for DUAL modes. - */ -public class IgfsFileWorker extends IgfsThread { - /** Time during which thread remains alive since it's last batch is finished. */ - private static final long THREAD_REUSE_WAIT_TIME = 5000; - - /** Lock */ - private final Lock lock = new ReentrantLock(); - - /** Condition. */ - private final Condition cond = lock.newCondition(); - - /** Next queued batch. */ - private IgfsFileWorkerBatch nextBatch; - - /** Batch which is currently being processed. */ - private IgfsFileWorkerBatch curBatch; - - /** Cancellation flag. */ - private volatile boolean cancelled; - - /** - * Creates {@code GGFS} file worker. - * - * @param name Worker name. - */ - IgfsFileWorker(String name) { - super(name); - } - - /** - * Add worker batch. - * - * @return {@code True} if the batch was actually added. - */ - boolean addBatch(IgfsFileWorkerBatch batch) { - assert batch != null; - - lock.lock(); - - try { - if (!cancelled) { - assert nextBatch == null; // Remember, that write operations on a single file are exclusive. - - nextBatch = batch; - - cond.signalAll(); - - return true; - } - else - return false; - } - finally { - lock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - while (!cancelled) { - lock.lock(); - - try { - // If there are no more new batches, wait for several seconds before shutting down the thread. - if (!cancelled && nextBatch == null) - cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS); - - curBatch = nextBatch; - - nextBatch = null; - - if (cancelled && curBatch != null) - curBatch.finish(); // Mark the batch as finished if cancelled. - } - finally { - lock.unlock(); - } - - if (curBatch != null) - curBatch.process(); - else { - lock.lock(); - - try { - // No more new batches, we can safely release the worker as it was inactive for too long. - if (nextBatch == null) - cancelled = true; - } - finally { - lock.unlock(); - } - } - } - } - - /** {@inheritDoc} */ - @Override protected void cleanup() { - assert cancelled; // Cleanup can only be performed on a cancelled worker. - - // Clear interrupted flag. - boolean interrupted = interrupted(); - - // Process the last batch if any. - if (nextBatch != null) - nextBatch.process(); - - onFinish(); - - // Reset interrupted flag. - if (interrupted) - interrupt(); - } - - /** - * Forcefully finish execution of all batches. - */ - void cancel() { - lock.lock(); - - try { - cancelled = true; - - if (curBatch != null) - curBatch.finish(); - - if (nextBatch != null) - nextBatch.finish(); - - cond.signalAll(); // Awake the main loop in case it is still waiting for the next batch. - } - finally { - lock.unlock(); - } - } - - /** - * Get current batch. - * - * @return Current batch. - */ - IgfsFileWorkerBatch currentBatch() { - lock.lock(); - - try { - return nextBatch == null ? curBatch : nextBatch; - } - finally { - lock.unlock(); - } - } - - /** - * Callback invoked when worker has processed all it's batches. - */ - protected void onFinish() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerBatch.java deleted file mode 100644 index 0b38c0d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerBatch.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * Work batch is an abstraction of the logically grouped tasks. - */ -public class IgfsFileWorkerBatch { - /** Completion latch. */ - private final CountDownLatch completeLatch = new CountDownLatch(1); - - /** Finish guard. */ - private final AtomicBoolean finishGuard = new AtomicBoolean(); - - /** Lock for finish operation. */ - private final ReadWriteLock finishLock = new ReentrantReadWriteLock(); - - /** Tasks queue. */ - private final BlockingDeque<IgfsFileWorkerTask> queue = new LinkedBlockingDeque<>(); - - /** Path to the file in the primary file system. */ - private final IgfsPath path; - - /** Output stream to the file. */ - private final OutputStream out; - - /** Caught exception. */ - private volatile IgniteCheckedException err; - - /** Last task marker. */ - private boolean lastTask; - - /** - * Constructor. - * - * @param path Path to the file in the primary file system. - * @param out Output stream opened to that file. - */ - IgfsFileWorkerBatch(IgfsPath path, OutputStream out) { - assert path != null; - assert out != null; - - this.path = path; - this.out = out; - } - - /** - * Perform write. - * - * @param data Data to be written. - * @return {@code True} in case operation was enqueued. - */ - boolean write(final byte[] data) { - return addTask(new IgfsFileWorkerTask() { - @Override public void execute() throws IgniteCheckedException { - try { - out.write(data); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to write data to the file due to secondary file system " + - "exception: " + path, e); - } - } - }); - } - - /** - * Process the batch. - */ - void process() { - try { - boolean cancelled = false; - - while (!cancelled) { - try { - IgfsFileWorkerTask task = queue.poll(1000, TimeUnit.MILLISECONDS); - - if (task == null) - continue; - - task.execute(); - - if (lastTask) - cancelled = true; - } - catch (IgniteCheckedException e) { - err = e; - - cancelled = true; - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - - cancelled = true; - } - } - } - finally { - try { - onComplete(); - } - finally { - U.closeQuiet(out); - - completeLatch.countDown(); - } - } - } - - /** - * Add the last task to that batch which will release all the resources. - */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - void finish() { - if (finishGuard.compareAndSet(false, true)) { - finishLock.writeLock().lock(); - - try { - queue.add(new IgfsFileWorkerTask() { - @Override public void execute() { - assert queue.isEmpty(); - - lastTask = true; - } - }); - } - finally { - finishLock.writeLock().unlock(); - } - } - } - - /** - * Await for that worker batch to complete. - * - * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. - */ - void await() throws IgniteCheckedException { - try { - completeLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - - IgniteCheckedException err0 = err; - - if (err0 != null) - throw err0; - } - - /** - * Await for that worker batch to complete in case it was marked as finished. - * - * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. - */ - void awaitIfFinished() throws IgniteCheckedException { - if (finishGuard.get()) - await(); - } - - /** - * Get primary file system path. - * - * @return Primary file system path. - */ - IgfsPath path() { - return path; - } - - /** - * Callback invoked when all the tasks within the batch are completed. - */ - protected void onComplete() { - // No-op. - } - - /** - * Add task to the queue. - * - * @param task Task to add. - * @return {@code True} in case the task was added to the queue. - */ - private boolean addTask(IgfsFileWorkerTask task) { - finishLock.readLock().lock(); - - try { - if (!finishGuard.get()) { - try { - queue.put(task); - - return true; - } - catch (InterruptedException ignore) { - // Task was not enqueued due to interruption. - Thread.currentThread().interrupt(); - - return false; - } - } - else - return false; - - } - finally { - finishLock.readLock().unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerTask.java deleted file mode 100644 index 6192d7a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsFileWorkerTask.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; - -/** - * Generic GGFS worker task which could potentially throw an exception. - */ -public interface IgfsFileWorkerTask { - /** - * Execute task logic. - * - * @throws IgniteCheckedException If failed. - */ - public void execute() throws IgniteCheckedException; -}
