# IGNITE-226: WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1428b796 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1428b796 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1428b796 Branch: refs/heads/ignite-226 Commit: 1428b79640de69f74317fafe63a7e4a37321b9f1 Parents: 84da0e0 Author: vozerov-gridgain <[email protected]> Authored: Fri Feb 13 14:30:28 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Feb 13 14:30:29 2015 +0300 ---------------------------------------------------------------------- .../internal/fs/common/IgfsControlResponse.java | 633 --------------- .../internal/fs/common/IgfsDataInputStream.java | 40 - .../fs/common/IgfsDataOutputStream.java | 43 -- .../fs/common/IgfsHandshakeRequest.java | 93 --- .../internal/fs/common/IgfsIpcCommand.java | 98 --- .../ignite/internal/fs/common/IgfsLogger.java | 767 ------------------- .../internal/fs/common/IgfsMarshaller.java | 374 --------- .../ignite/internal/fs/common/IgfsMessage.java | 41 - .../fs/common/IgfsPathControlRequest.java | 238 ------ .../internal/fs/common/IgfsStatusRequest.java | 35 - .../fs/common/IgfsStreamControlRequest.java | 101 --- .../ignite/internal/fs/common/package.html | 24 - .../igfs/common/IgfsControlResponse.java | 633 +++++++++++++++ .../igfs/common/IgfsDataInputStream.java | 40 + .../igfs/common/IgfsDataOutputStream.java | 43 ++ .../igfs/common/IgfsHandshakeRequest.java | 93 +++ .../internal/igfs/common/IgfsIpcCommand.java | 98 +++ .../ignite/internal/igfs/common/IgfsLogger.java | 767 +++++++++++++++++++ .../internal/igfs/common/IgfsMarshaller.java | 374 +++++++++ .../internal/igfs/common/IgfsMessage.java | 41 + .../igfs/common/IgfsPathControlRequest.java | 238 ++++++ .../internal/igfs/common/IgfsStatusRequest.java | 35 + .../igfs/common/IgfsStreamControlRequest.java | 101 +++ .../ignite/internal/igfs/common/package.html | 24 + .../internal/processors/fs/IgfsIpcHandler.java | 2 +- .../internal/processors/fs/IgfsServer.java | 2 +- .../processors/fs/IgfsServerHandler.java | 2 +- .../visor/ggfs/VisorIgfsProfilerTask.java | 2 +- .../igfs/hadoop/v1/IgfsHadoopFileSystem.java | 6 +- .../igfs/hadoop/v2/IgfsHadoopFileSystem.java | 6 +- .../ignite/internal/fs/hadoop/IgfsHadoop.java | 198 ----- .../IgfsHadoopCommunicationException.java | 57 -- .../internal/fs/hadoop/IgfsHadoopEndpoint.java | 210 ----- .../ignite/internal/fs/hadoop/IgfsHadoopEx.java | 88 --- .../fs/hadoop/IgfsHadoopFSProperties.java | 88 --- .../fs/hadoop/IgfsHadoopFileSystemWrapper.java | 413 ---------- .../internal/fs/hadoop/IgfsHadoopFuture.java | 94 --- .../internal/fs/hadoop/IgfsHadoopInProc.java | 409 ---------- .../fs/hadoop/IgfsHadoopInputStream.java | 626 --------------- .../ignite/internal/fs/hadoop/IgfsHadoopIo.java | 76 -- .../internal/fs/hadoop/IgfsHadoopIpcIo.java | 599 --------------- .../fs/hadoop/IgfsHadoopIpcIoListener.java | 36 - .../internal/fs/hadoop/IgfsHadoopJclLogger.java | 115 --- .../internal/fs/hadoop/IgfsHadoopOutProc.java | 466 ----------- .../fs/hadoop/IgfsHadoopOutputStream.java | 201 ----- .../fs/hadoop/IgfsHadoopProxyInputStream.java | 335 -------- .../fs/hadoop/IgfsHadoopProxyOutputStream.java | 165 ---- .../internal/fs/hadoop/IgfsHadoopReader.java | 104 --- .../fs/hadoop/IgfsHadoopStreamDelegate.java | 96 --- .../hadoop/IgfsHadoopStreamEventListener.java | 39 - .../internal/fs/hadoop/IgfsHadoopUtils.java | 131 ---- .../internal/fs/hadoop/IgfsHadoopWrapper.java | 511 ------------ .../ignite/internal/fs/hadoop/package.html | 24 - .../org/apache/ignite/internal/fs/package.html | 24 - .../ignite/internal/igfs/hadoop/IgfsHadoop.java | 198 +++++ .../IgfsHadoopCommunicationException.java | 57 ++ .../igfs/hadoop/IgfsHadoopEndpoint.java | 210 +++++ .../internal/igfs/hadoop/IgfsHadoopEx.java | 88 +++ .../igfs/hadoop/IgfsHadoopFSProperties.java | 88 +++ .../hadoop/IgfsHadoopFileSystemWrapper.java | 413 ++++++++++ .../internal/igfs/hadoop/IgfsHadoopFuture.java | 94 +++ .../internal/igfs/hadoop/IgfsHadoopInProc.java | 409 ++++++++++ .../igfs/hadoop/IgfsHadoopInputStream.java | 626 +++++++++++++++ .../internal/igfs/hadoop/IgfsHadoopIo.java | 76 ++ .../internal/igfs/hadoop/IgfsHadoopIpcIo.java | 599 +++++++++++++++ .../igfs/hadoop/IgfsHadoopIpcIoListener.java | 36 + .../igfs/hadoop/IgfsHadoopJclLogger.java | 115 +++ .../internal/igfs/hadoop/IgfsHadoopOutProc.java | 466 +++++++++++ .../igfs/hadoop/IgfsHadoopOutputStream.java | 201 +++++ .../igfs/hadoop/IgfsHadoopProxyInputStream.java | 335 ++++++++ .../hadoop/IgfsHadoopProxyOutputStream.java | 165 ++++ .../internal/igfs/hadoop/IgfsHadoopReader.java | 104 +++ .../igfs/hadoop/IgfsHadoopStreamDelegate.java | 96 +++ .../hadoop/IgfsHadoopStreamEventListener.java | 39 + .../internal/igfs/hadoop/IgfsHadoopUtils.java | 131 ++++ .../internal/igfs/hadoop/IgfsHadoopWrapper.java | 511 ++++++++++++ .../ignite/internal/igfs/hadoop/package.html | 24 + .../apache/ignite/internal/igfs/package.html | 24 + .../GridHadoopDefaultMapReducePlanner.java | 2 +- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 2 +- .../IgfsHadoop20FileSystemAbstractSelfTest.java | 2 +- .../igfs/IgfsHadoopDualAbstractSelfTest.java | 2 +- .../IgfsHadoopFileSystemAbstractSelfTest.java | 2 +- .../IgfsHadoopFileSystemClientSelfTest.java | 4 +- .../IgfsHadoopFileSystemHandshakeSelfTest.java | 2 +- .../IgfsHadoopFileSystemIpcCacheSelfTest.java | 2 +- .../IgfsHadoopFileSystemLoggerSelfTest.java | 4 +- ...IgfsHadoopFileSystemLoggerStateSelfTest.java | 2 +- ...fsHadoopFileSystemSecondaryModeSelfTest.java | 2 +- 89 files changed, 7615 insertions(+), 7615 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java deleted file mode 100644 index ca8c9ae..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*; - -/** - * GGFS path command response. - */ -public class IgfsControlResponse extends IgfsMessage { - /** Generic error (not GGFS) while performing operations. */ - private static final int ERR_GENERIC = 0; - - /** Generic GGFS error while performing operations. */ - private static final int ERR_GGFS_GENERIC = 1; - - /** Target file not found. */ - private static final int ERR_FILE_NOT_FOUND = 2; - - /** Target path already exists. */ - private static final int ERR_PATH_ALREADY_EXISTS = 3; - - /** Directory is not empty with */ - private static final int ERR_DIRECTORY_NOT_EMPTY = 4; - - /** Target parent is not a directory. */ - private static final int ERR_PARENT_NOT_DIRECTORY = 5; - - /** Secondary HDFS version differs from classpath version. */ - private static final int ERR_INVALID_HDFS_VERSION = 6; - - /** Failed to retrieve file's data block. */ - private static final int ERR_CORRUPTED_FILE = 7; - - /** Response is boolean. */ - public static final int RES_TYPE_BOOLEAN = 0; - - /** Response is Long. */ - public static final int RES_TYPE_LONG = 1; - - /** Response is GridGgfsFile. */ - public static final int RES_TYPE_GGFS_FILE = 2; - - /** Response is GridGgfsFileInfo. */ - public static final int RES_TYPE_GGFS_STREAM_DESCRIPTOR = 3; - - /** Response is GridGgfsPath. */ - public static final int RES_TYPE_GGFS_PATH = 4; - - /** Response is collection of GridGgfsFile. */ - public static final int RES_TYPE_COL_GGFS_FILE = 5; - - /** Response is collection of GridGgfsPath. */ - public static final int RES_TYPE_COL_GGFS_PATH = 6; - - /** Response is collection of GridGgfsBlockLocation. */ - public static final int RES_TYPE_COL_GGFS_BLOCK_LOCATION = 7; - - /** Response is collection of GridGgfsBlockLocation. */ - public static final int RES_TYPE_BYTE_ARRAY = 8; - - /** Response is an error containing stream ID and error message. */ - public static final int RES_TYPE_ERR_STREAM_ID = 9; - - /** Response is a handshake */ - public static final int RES_TYPE_HANDSHAKE = 10; - - /** Response is a handshake */ - public static final int RES_TYPE_STATUS = 11; - - /** Response is a path summary. */ - public static final int RES_TYPE_GGFS_PATH_SUMMARY = 12; - - /** Message header size. */ - public static final int RES_HEADER_SIZE = 9; - - /** We have limited number of object response types. */ - private int resType = -1; - - /** Response. */ - @GridToStringInclude - private Object res; - - /** Bytes length to avoid iteration and summing. */ - private int len; - - /** Error (if any). */ - private String err; - - /** Error code. */ - private int errCode = -1; - - /** - * - */ - public IgfsControlResponse() { - command(CONTROL_RESPONSE); - } - - /** - * @return Response. - */ - public Object response() { - return res; - } - - /** - * @param res Response. - */ - public void response(boolean res) { - resType = RES_TYPE_BOOLEAN; - - this.res = res; - } - - /** - * @param res Response. - */ - public void response(long res) { - resType = RES_TYPE_LONG; - - this.res = res; - } - - /** - * @param res Response. - */ - public void response(byte[][] res) { - resType = RES_TYPE_BYTE_ARRAY; - - this.res = res; - } - - /** - * @param res Response. - */ - public void response(IgfsInputStreamDescriptor res) { - resType = RES_TYPE_GGFS_STREAM_DESCRIPTOR; - - this.res = res; - } - - /** - * @param res Response. - */ - public void response(IgfsFile res) { - resType = RES_TYPE_GGFS_FILE; - - this.res = res; - } - - /** - * @param res Response. - */ - public void response(IgfsPath res) { - resType = RES_TYPE_GGFS_PATH; - - this.res = res; - } - - /** - * @param res Path summary response. - */ - public void response(IgfsPathSummary res) { - resType = RES_TYPE_GGFS_PATH_SUMMARY; - - this.res = res; - } - - /** - * @param res Response. - */ - public void files(Collection<IgfsFile> res) { - resType = RES_TYPE_COL_GGFS_FILE; - - this.res = res; - } - - /** - * @param res Response. - */ - public void paths(Collection<IgfsPath> res) { - resType = RES_TYPE_COL_GGFS_PATH; - - this.res = res; - } - - /** - * @param res Response. - */ - public void locations(Collection<IgfsBlockLocation> res) { - resType = RES_TYPE_COL_GGFS_BLOCK_LOCATION; - - this.res = res; - } - - /** - * @param res Handshake message. - */ - public void handshake(IgfsHandshakeResponse res) { - resType = RES_TYPE_HANDSHAKE; - - this.res = res; - } - - /** - * @param res Status response. - */ - public void status(IgfsStatus res) { - resType = RES_TYPE_STATUS; - - this.res = res; - } - - /** - * @param len Response length. - */ - public void length(int len) { - this.len = len; - } - - /** - * @return Error message if occurred. - */ - public boolean hasError() { - return errCode != -1; - } - - /** - * @param errCode Error code. - * @param err Error. - * @throws IgniteCheckedException Based on error code. - */ - public static void throwError(Integer errCode, String err) throws IgniteCheckedException { - assert err != null; - assert errCode != -1; - - if (errCode == ERR_FILE_NOT_FOUND) - throw new IgfsFileNotFoundException(err); - else if (errCode == ERR_PATH_ALREADY_EXISTS) - throw new IgfsPathAlreadyExistsException(err); - else if (errCode == ERR_DIRECTORY_NOT_EMPTY) - throw new IgfsDirectoryNotEmptyException(err); - else if (errCode == ERR_PARENT_NOT_DIRECTORY) - throw new IgfsParentNotDirectoryException(err); - else if (errCode == ERR_INVALID_HDFS_VERSION) - throw new IgfsInvalidHdfsVersionException(err); - else if (errCode == ERR_CORRUPTED_FILE) - throw new IgfsCorruptedFileException(err); - else if (errCode == ERR_GGFS_GENERIC) - throw new IgfsException(err); - - throw new IgniteCheckedException(err); - } - - /** - * @throws IgniteCheckedException Based on error code. - */ - public void throwError() throws IgniteCheckedException { - throwError(errCode, err); - } - - /** - * @return Error code. - */ - public int errorCode() { - return errCode; - } - - /** - * @param e Error if occurred. - */ - public void error(IgniteCheckedException e) { - err = e.getMessage(); - errCode = errorCode(e); - } - - /** - * @param streamId Stream ID. - * @param err Error message if occurred. - */ - public void error(long streamId, String err) { - resType = RES_TYPE_ERR_STREAM_ID; - - res = streamId; - errCode = ERR_GENERIC; - - this.err = err; - } - - /** - * Gets error code based on exception class. - * - * @param e Exception to analyze. - * @return Error code. - */ - private int errorCode(IgniteCheckedException e) { - return errorCode(e, true); - } - - /** - * Gets error code based on exception class. - * - * @param e Exception to analyze. - * @param checkIo Whether to check for IO exception. - * @return Error code. - */ - @SuppressWarnings("unchecked") - private int errorCode(IgniteCheckedException e, boolean checkIo) { - if (X.hasCause(e, IgfsFileNotFoundException.class)) - return ERR_FILE_NOT_FOUND; - else if (e.hasCause(IgfsPathAlreadyExistsException.class)) - return ERR_PATH_ALREADY_EXISTS; - else if (e.hasCause(IgfsDirectoryNotEmptyException.class)) - return ERR_DIRECTORY_NOT_EMPTY; - else if (e.hasCause(IgfsParentNotDirectoryException.class)) - return ERR_PARENT_NOT_DIRECTORY; - else if (e.hasCause(IgfsInvalidHdfsVersionException.class)) - return ERR_INVALID_HDFS_VERSION; - else if (e.hasCause(IgfsCorruptedFileException.class)) - return ERR_CORRUPTED_FILE; - // This check should be the last. - else if (e.hasCause(IgfsException.class)) - return ERR_GGFS_GENERIC; - - return ERR_GENERIC; - } - - /** - * Writes object to data output. Do not use externalizable interface to avoid marshaller. - * - * @param out Data output. - * @throws IOException If error occurred. - */ - @SuppressWarnings("unchecked") - public void writeExternal(ObjectOutput out) throws IOException { - byte[] hdr = new byte[RES_HEADER_SIZE]; - - U.intToBytes(resType, hdr, 0); - - int off = 4; - - hdr[off++] = err != null ? (byte)1 : (byte)0; - - if (resType == RES_TYPE_BYTE_ARRAY) - U.intToBytes(len, hdr, off); - - out.write(hdr); - - if (err != null) { - out.writeUTF(err); - out.writeInt(errCode); - - if (resType == RES_TYPE_ERR_STREAM_ID) - out.writeLong((Long)res); - - return; - } - - switch (resType) { - case RES_TYPE_BOOLEAN: - out.writeBoolean((Boolean)res); - - break; - - case RES_TYPE_LONG: - out.writeLong((Long)res); - - break; - - case RES_TYPE_BYTE_ARRAY: - byte[][] buf = (byte[][])res; - - for (byte[] bytes : buf) - out.write(bytes); - - break; - - case RES_TYPE_GGFS_PATH: - case RES_TYPE_GGFS_PATH_SUMMARY: - case RES_TYPE_GGFS_FILE: - case RES_TYPE_GGFS_STREAM_DESCRIPTOR: - case RES_TYPE_HANDSHAKE: - case RES_TYPE_STATUS: { - out.writeBoolean(res != null); - - if (res != null) - ((Externalizable)res).writeExternal(out); - - break; - } - - case RES_TYPE_COL_GGFS_FILE: - case RES_TYPE_COL_GGFS_PATH: - case RES_TYPE_COL_GGFS_BLOCK_LOCATION: { - Collection<Externalizable> items = (Collection<Externalizable>)res; - - if (items != null) { - out.writeInt(items.size()); - - for (Externalizable item : items) - item.writeExternal(out); - } - else - out.writeInt(-1); - - break; - } - } - } - - /** - * Reads object from data input. - * - * @param in Data input. - * @throws IOException If read failed. - * @throws ClassNotFoundException If could not find class. - */ - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - byte[] hdr = new byte[RES_HEADER_SIZE]; - - in.readFully(hdr); - - resType = U.bytesToInt(hdr, 0); - - boolean hasErr = hdr[4] != 0; - - if (hasErr) { - err = in.readUTF(); - errCode = in.readInt(); - - if (resType == RES_TYPE_ERR_STREAM_ID) - res = in.readLong(); - - return; - } - - switch (resType) { - case RES_TYPE_BOOLEAN: - res = in.readBoolean(); - - break; - - case RES_TYPE_LONG: - res = in.readLong(); - - break; - - case RES_TYPE_GGFS_PATH: { - boolean hasVal = in.readBoolean(); - - if (hasVal) { - IgfsPath path = new IgfsPath(); - - path.readExternal(in); - - res = path; - } - - break; - } - - case RES_TYPE_GGFS_PATH_SUMMARY: { - boolean hasVal = in.readBoolean(); - - if (hasVal) { - IgfsPathSummary sum = new IgfsPathSummary(); - - sum.readExternal(in); - - res = sum; - } - - break; - } - - case RES_TYPE_GGFS_FILE: { - boolean hasVal = in.readBoolean(); - - if (hasVal) { - IgfsFileImpl file = new IgfsFileImpl(); - - file.readExternal(in); - - res = file; - } - - break; - } - - case RES_TYPE_GGFS_STREAM_DESCRIPTOR: { - boolean hasVal = in.readBoolean(); - - if (hasVal) { - IgfsInputStreamDescriptor desc = new IgfsInputStreamDescriptor(); - - desc.readExternal(in); - - res = desc; - } - - break; - } - - case RES_TYPE_HANDSHAKE: { - boolean hasVal = in.readBoolean(); - - if (hasVal) { - IgfsHandshakeResponse msg = new IgfsHandshakeResponse(); - - msg.readExternal(in); - - res = msg; - } - - break; - } - - case RES_TYPE_STATUS: { - boolean hasVal = in.readBoolean(); - - if (hasVal) { - IgfsStatus msg = new IgfsStatus(); - - msg.readExternal(in); - - res = msg; - } - - break; - } - - case RES_TYPE_COL_GGFS_FILE: { - Collection<IgfsFile> files = null; - - int size = in.readInt(); - - if (size >= 0) { - files = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - IgfsFileImpl file = new IgfsFileImpl(); - - file.readExternal(in); - - files.add(file); - } - } - - res = files; - - break; - } - - case RES_TYPE_COL_GGFS_PATH: { - Collection<IgfsPath> paths = null; - - int size = in.readInt(); - - if (size >= 0) { - paths = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - IgfsPath path = new IgfsPath(); - - path.readExternal(in); - - paths.add(path); - } - } - - res = paths; - - break; - } - - case RES_TYPE_COL_GGFS_BLOCK_LOCATION: { - Collection<IgfsBlockLocation> locations = null; - - int size = in.readInt(); - - if (size >= 0) { - locations = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - IgfsBlockLocationImpl location = new IgfsBlockLocationImpl(); - - location.readExternal(in); - - locations.add(location); - } - } - - res = locations; - - break; - } - - case RES_TYPE_BYTE_ARRAY: - assert false : "Response type of byte array should never be processed by marshaller."; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsControlResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java deleted file mode 100644 index 4969305..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import java.io.*; - -/** - * Data input stream implementing object input but throwing exceptions on object methods. - */ -public class IgfsDataInputStream extends DataInputStream implements ObjectInput { - /** - * Creates a DataInputStream that uses the specified - * underlying InputStream. - * - * @param in The specified input stream - */ - public IgfsDataInputStream(InputStream in) { - super(in); - } - - /** {@inheritDoc} */ - @Override public Object readObject() throws ClassNotFoundException, IOException { - throw new IOException("This method must not be invoked on GGFS data input stream."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java deleted file mode 100644 index b562d36..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import java.io.*; - -/** - * Data output stream implementing ObjectOutput but throwing exceptions on methods working with objects. - */ -public class IgfsDataOutputStream extends DataOutputStream implements ObjectOutput { - /** - * Creates a new data output stream to write data to the specified - * underlying output stream. The counter <code>written</code> is - * set to zero. - * - * @param out the underlying output stream, to be saved for later - * use. - * @see FilterOutputStream#out - */ - public IgfsDataOutputStream(OutputStream out) { - super(out); - } - - /** {@inheritDoc} */ - @Override public void writeObject(Object obj) throws IOException { - throw new IOException("This method must not be invoked on GGFS data output stream."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java deleted file mode 100644 index c758979..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*; - -/** - * Handshake request. - */ -public class IgfsHandshakeRequest extends IgfsMessage { - /** Expected Grid name. */ - private String gridName; - - /** Expected GGFS name. */ - private String ggfsName; - - /** Logger directory. */ - private String logDir; - - /** {@inheritDoc} */ - @Override public IgfsIpcCommand command() { - return HANDSHAKE; - } - - /** {@inheritDoc} */ - @Override public void command(IgfsIpcCommand cmd) { - // No-op. - } - - /** - * @return Grid name. - */ - public String gridName() { - return gridName; - } - - /** - * @param gridName Grid name. - */ - public void gridName(String gridName) { - this.gridName = gridName; - } - - /** - * @return GGFS name. - */ - public String ggfsName() { - return ggfsName; - } - - /** - * @param ggfsName GGFS name. - */ - public void ggfsName(String ggfsName) { - this.ggfsName = ggfsName; - } - - /** - * @return Log directory. - */ - public String logDirectory() { - return logDir; - } - - /** - * @param logDir Log directory. - */ - public void logDirectory(String logDir) { - this.logDir = logDir; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHandshakeRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java deleted file mode 100644 index 7530a57..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import java.util.*; - -/** - * Grid file system commands to call remotely. - */ -public enum IgfsIpcCommand { - /** Handshake command which will send information necessary for client to handle requests correctly. */ - HANDSHAKE, - - /** GGFS status (free/used space). */ - STATUS, - - /** Check specified path exists in the file system. */ - EXISTS, - - /** Get information for the file in specified path. */ - INFO, - - /** Get directory summary. */ - PATH_SUMMARY, - - /** Update information for the file in specified path. */ - UPDATE, - - /** Rename file. */ - RENAME, - - /** Delete file. */ - DELETE, - - /** Make directories. */ - MAKE_DIRECTORIES, - - /** List files under the specified path. */ - LIST_PATHS, - - /** List files under the specified path. */ - LIST_FILES, - - /** Get affinity block locations for data blocks of the file. */ - AFFINITY, - - /** Updates last access and last modification time for a path. */ - SET_TIMES, - - /** Open file for reading as an input stream. */ - OPEN_READ, - - /** Open existent file as output stream to append data to. */ - OPEN_APPEND, - - /** Create file and open output stream for writing data to. */ - OPEN_CREATE, - - /** Close stream. */ - CLOSE, - - /** Read file's data block. */ - READ_BLOCK, - - /** Write file's data block. */ - WRITE_BLOCK, - - /** Server response. */ - CONTROL_RESPONSE; - - /** All values */ - private static final List<IgfsIpcCommand> ALL = Arrays.asList(values()); - - /** - * Resolve command by its ordinal. - * - * @param ordinal Command ordinal. - * @return Resolved command. - */ - public static IgfsIpcCommand valueOf(int ordinal) { - return ALL.get(ordinal); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java deleted file mode 100644 index 119ae56..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java +++ /dev/null @@ -1,767 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * GGFS client logger writing data to the file. - */ -public final class IgfsLogger { - /** Field delimiter. */ - public static final String DELIM_FIELD = ";"; - - /** Field values delimiter. */ - public static final String DELIM_FIELD_VAL = ","; - - /** Pre-defined header string. */ - public static final String HDR = "Timestamp" + DELIM_FIELD + "ThreadID" + DELIM_FIELD + "PID" + DELIM_FIELD + - "Type" + DELIM_FIELD + "Path" + DELIM_FIELD + "Mode" + DELIM_FIELD + "StreamId" + DELIM_FIELD + "BufSize" + - DELIM_FIELD + "DataLen" + DELIM_FIELD + "Append" + DELIM_FIELD + "Overwrite" + DELIM_FIELD + "Replication" + - DELIM_FIELD + "BlockSize" + DELIM_FIELD + "Position" + DELIM_FIELD + "ReadLen" + DELIM_FIELD + "SkipCnt" + - DELIM_FIELD + "ReadLimit" + DELIM_FIELD + "UserTime" + DELIM_FIELD + "SystemTime" + DELIM_FIELD + - "TotalBytes" + DELIM_FIELD + "DestPath" + DELIM_FIELD + "Recursive" + DELIM_FIELD + "List"; - - /** File open. */ - public static final int TYPE_OPEN_IN = 0; - - /** File create or append. */ - public static final int TYPE_OPEN_OUT = 1; - - /** Random read. */ - public static final int TYPE_RANDOM_READ = 2; - - /** Seek. */ - public static final int TYPE_SEEK = 3; - - /** Skip. */ - public static final int TYPE_SKIP = 4; - - /** Mark. */ - public static final int TYPE_MARK = 5; - - /** Reset. */ - public static final int TYPE_RESET = 6; - - /** Close input stream. */ - public static final int TYPE_CLOSE_IN = 7; - - /** Close output stream. */ - public static final int TYPE_CLOSE_OUT = 8; - - /** Directory creation. */ - public static final int TYPE_DIR_MAKE = 9; - - /** Directory listing. */ - public static final int TYPE_DIR_LIST = 10; - - /** Rename. */ - public static final int TYPE_RENAME = 11; - - /** Delete. */ - public static final int TYPE_DELETE = 12; - - /** Counter for stream identifiers. */ - private static final AtomicLong CNTR = new AtomicLong(); - - /** Loggers. */ - private static final ConcurrentHashMap8<String, IgfsLogger> loggers = - new ConcurrentHashMap8<>(); - - /** Lock for atomic logger adds/removals. */ - private static final ReadWriteLock logLock = new ReentrantReadWriteLock(); - - /** Predefined disabled logger. */ - private static final IgfsLogger disabledLogger = new IgfsLogger(); - - /** Logger enabled flag. */ - private boolean enabled; - - /** Endpoint. */ - private String endpoint; - - /** Batch size. */ - private int batchSize; - - /** File to which data is to be written. */ - private File file; - - /** Read/write lock for concurrent entries collection modification. */ - private ReadWriteLock rwLock; - - /** Flush lock. */ - private Lock flushLock; - - /** Flush condition. */ - private Condition flushCond; - - /** Logged data flusher. */ - private Thread flushWorker; - - /** Process ID. */ - private int pid; - - /** Entries. */ - private Collection<Entry> entries; - - /** Entries counter in order to avoid concurrent collection size checks. */ - private AtomicInteger cnt; - - /** Logger usage counter. */ - private AtomicInteger useCnt; - - /** - * Get next stream ID. - * - * @return Stream ID. - */ - public static long nextId() { - return CNTR.incrementAndGet(); - } - - /** - * Get disabled logger. - * - * @return Disable logger instance. - */ - public static IgfsLogger disabledLogger() { - return disabledLogger; - } - - /** - * Get logger instance for the given endpoint. - * - * @param endpoint Endpoint. - * @param dir Path. - * @param batchSize Batch size. - * - * @return Logger instance. - */ - public static IgfsLogger logger(String endpoint, String ggfsName, String dir, int batchSize) { - if (endpoint == null) - endpoint = ""; - - logLock.readLock().lock(); - - try { - IgfsLogger log = loggers.get(endpoint); - - if (log == null) { - log = new IgfsLogger(endpoint, ggfsName, dir, batchSize); - - IgfsLogger log0 = loggers.putIfAbsent(endpoint, log); - - if (log0 != null) - log = log0; - } - - log.useCnt.incrementAndGet(); - - return log; - } - finally { - logLock.readLock().unlock(); - } - } - - /** - * Construct disabled file logger. - */ - private IgfsLogger() { - // No-op. - } - - /** - * Construct normal file logger. - * - * @param endpoint Endpoint. - * @param ggfsName GGFS name. - * @param dir Log file path. - * @param batchSize Batch size. - */ - private IgfsLogger(String endpoint, String ggfsName, String dir, int batchSize) { - A.notNull(endpoint, "endpoint cannot be null"); - A.notNull(dir, "dir cannot be null"); - A.ensure(batchSize > 0, "batch size cannot be negative"); - - enabled = true; - - this.endpoint = endpoint; - this.batchSize = batchSize; - - pid = U.jvmPid(); - - File dirFile = new File(dir); - - A.ensure(dirFile.isDirectory(), "dir must point to a directory"); - A.ensure(dirFile.exists(), "dir must exist"); - - file = new File(dirFile, "ggfs-log-" + ggfsName + "-" + pid + ".csv"); - - entries = new ConcurrentLinkedDeque8<>(); - - cnt = new AtomicInteger(); - useCnt = new AtomicInteger(); - - rwLock = new ReentrantReadWriteLock(); - flushLock = new ReentrantLock(); - flushCond = flushLock.newCondition(); - - flushWorker = new Thread(new FlushWorker()); - - flushWorker.setDaemon(true); - - flushWorker.start(); - } - - /** - * Check whether logging is enabled. - * - * @return {@code True} in case logging is enabled. - */ - public boolean isLogEnabled() { - return enabled; - } - - /** - * Log file open event. - * - * @param streamId Stream ID. - * @param path Path. - * @param mode Mode. - * @param bufSize Buffer size. - * @param dataLen Data length. - */ - public void logOpen(long streamId, IgfsPath path, IgfsMode mode, int bufSize, long dataLen) { - addEntry(new Entry(TYPE_OPEN_IN, path.toString(), mode, streamId, bufSize, dataLen, null, null, null, null, - null, null, null, null, null, null, null, null, null, null)); - } - - /** - * Log file create event. - * - * @param streamId Stream ID. - * @param path Path. - * @param mode Mode. - * @param overwrite Overwrite flag. - * @param bufSize Buffer size. - * @param replication Replication factor. - * @param blockSize Block size. - */ - public void logCreate(long streamId, IgfsPath path, IgfsMode mode, boolean overwrite, int bufSize, - int replication, long blockSize) { - addEntry(new Entry(TYPE_OPEN_OUT, path.toString(), mode, streamId, bufSize, null, false, overwrite, replication, - blockSize, null, null, null, null, null, null, null, null, null, null)); - } - - /** - * Log file append event. - * - * @param streamId Stream ID. - * @param path Path. - * @param mode Mode. - * @param bufSize Buffer size. - */ - public void logAppend(long streamId, IgfsPath path, IgfsMode mode, int bufSize) { - addEntry(new Entry(TYPE_OPEN_OUT, path.toString(), mode, streamId, bufSize, null, true, null, null, null, null, - null, null, null, null, null, null, null, null, null)); - } - - /** - * Log random read event. - * - * @param streamId Stream ID. - * @param pos Position. - * @param readLen Read bytes count. - */ - public void logRandomRead(long streamId, long pos, int readLen) { - addEntry(new Entry(TYPE_RANDOM_READ, null, null, streamId, null, null, null, null, null, null, pos, readLen, - null, null, null, null, null, null, null, null)); - } - - /** - * Log seek event. - * - * @param streamId Stream ID. - * @param pos Position. - */ - public void logSeek(long streamId, long pos) { - addEntry(new Entry(TYPE_SEEK, null, null, streamId, null, null, null, null, null, null, pos, null, null, null, - null, null, null, null, null, null)); - } - - /** - * Log skip event. - * - * @param streamId Stream ID. - * @param skipCnt Skip bytes count. - */ - public void logSkip(long streamId, long skipCnt) { - addEntry(new Entry(TYPE_SKIP, null, null, streamId, null, null, null, null, null, null, null, null, skipCnt, - null, null, null, null, null, null, null)); - } - - /** - * Log mark event. - * - * @param streamId Stream ID. - * @param readLimit Read limit. - */ - public void logMark(long streamId, long readLimit) { - addEntry(new Entry(TYPE_MARK, null, null, streamId, null, null, null, null, null, null, null, null, null, - readLimit, null, null, null, null, null, null)); - } - - /** - * Log reset event. - * - * @param streamId Stream ID. - */ - public void logReset(long streamId) { - addEntry(new Entry(TYPE_RESET, null, null, streamId, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null)); - } - - /** - * Log input stream close event. - * - * @param streamId Stream ID. - * @param userTime User time. - * @param readTime Read time. - * @param total Total bytes read. - */ - public void logCloseIn(long streamId, long userTime, long readTime, long total) { - addEntry(new Entry(TYPE_CLOSE_IN, null, null, streamId, null, null, null, null, null, null, null, null, null, - null, userTime, readTime, total ,null, null, null)); - } - - /** - * Log output stream close event. - * - * @param streamId Stream ID. - * @param userTime User time. - * @param writeTime Read time. - * @param total Total bytes read. - */ - public void logCloseOut(long streamId, long userTime, long writeTime, long total) { - addEntry(new Entry(TYPE_CLOSE_OUT, null, null, streamId, null, null, null, null, null, null, null, null, null, - null, userTime, writeTime, total, null, null, null)); - } - - /** - * Log directory creation event. - * - * @param path Path. - * @param mode Mode. - */ - public void logMakeDirectory(IgfsPath path, IgfsMode mode) { - addEntry(new Entry(TYPE_DIR_MAKE, path.toString(), mode, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null)); - } - - /** - * Log directory listing event. - * - * @param path Path. - * @param mode Mode. - * @param files Files. - */ - public void logListDirectory(IgfsPath path, IgfsMode mode, String[] files) { - addEntry(new Entry(TYPE_DIR_LIST, path.toString(), mode, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, files)); - } - - /** - * Log rename event. - * - * @param path Path. - * @param mode Mode. - * @param destPath Destination path. - */ - public void logRename(IgfsPath path, IgfsMode mode, IgfsPath destPath) { - addEntry(new Entry(TYPE_RENAME, path.toString(), mode, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, destPath.toString(), null, null)); - } - - /** - * Log delete event. - * - * @param path Path. - * @param mode Mode. - * @param recursive Recursive flag. - */ - public void logDelete(IgfsPath path, IgfsMode mode, boolean recursive) { - addEntry(new Entry(TYPE_DELETE, path.toString(), mode, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, recursive, null)); - } - - /** {@inheritDoc} */ - public void close() { - boolean close = false; - - if (useCnt.decrementAndGet() == 0) { - logLock.writeLock().lock(); - - try { - if (useCnt.get() == 0) { - loggers.remove(endpoint); - - close = true; - } - } - finally { - logLock.writeLock().unlock(); - } - } - - if (close) { - U.interrupt(flushWorker); - - try { - U.join(flushWorker); - } - catch (IgniteInterruptedCheckedException ignore) { - // No-op. - } - - entries.clear(); - } - } - - /** - * Add new log entry. - * - * @param entry Entry. - */ - @SuppressWarnings("SignalWithoutCorrespondingAwait") - private void addEntry(Entry entry) { - assert entry != null; - - rwLock.readLock().lock(); - - try { - entries.add(entry); - } - finally { - rwLock.readLock().unlock(); - } - - if (cnt.incrementAndGet() >= batchSize) { - if (flushLock.tryLock()) { - try { - flushCond.signalAll(); - } - finally { - flushLock.unlock(); - } - } - } - } - - /** - * Logged entry. - */ - private class Entry { - /** Thread ID. */ - private final long threadId; - - /** Timestamp. */ - private final long ts; - - /** Event type. */ - private final int type; - - /** File/dir path. */ - private final String path; - - /** Path mode. */ - private IgfsMode mode; - - /** Stream ID. */ - private final long streamId; - - /** Buffer size. Available only for OPEN_IN/OPEN_OUT events */ - private final int bufSize; - - /** Length of data available to read. Available only for OPEN_IN event. */ - private final long dataLen; - - /** Append flag. Available only for OPEN_OUT event. */ - private final Boolean append; - - /** Overwrite flag. Available only for OPEN_OUT event. */ - private final Boolean overwrite; - - /** Replication. Available only for OPEN_OUT event. */ - private final int replication; - - /** Block size. Available only for OPEN_OUT event. */ - private final long blockSize; - - /** Position of data being randomly read or seek. Available only for RANDOM_READ or SEEK events. */ - private final long pos; - - /** Length of data being randomly read. Available only for RANDOM_READ event. */ - private final int readLen; - - /** Amount of skipped bytes. Available only for SKIP event. */ - private final long skipCnt; - - /** Read limit. Available only for MARK event. */ - private final long readLimit; - - /** User time. Available only for CLOSE_IN/CLOSE_OUT events. */ - private final long userTime; - - /** System time (either read or write). Available only for CLOSE_IN/CLOSE_OUT events. */ - private final long sysTime; - - /** Total amount of read or written bytes. Available only for CLOSE_IN/CLOSE_OUT events.*/ - private final long total; - - /** Destination path. Available only for RENAME event. */ - private final String destPath; - - /** Recursive flag. Available only for DELETE event. */ - private final Boolean recursive; - - /** Directory listing. Available only for LIST event. */ - private final String[] list; - - /** - * Constructor. - * - * @param type Event type. - * @param path Path. - * @param mode Path mode. - * @param streamId Stream ID. - * @param bufSize Buffer size. - * @param dataLen Data length. - * @param append Append flag. - * @param overwrite Overwrite flag. - * @param replication Replication. - * @param blockSize Block size. - * @param pos Position. - * @param readLen Read length. - * @param skipCnt Skip count. - * @param readLimit Read limit. - * @param userTime User time. - * @param sysTime System time. - * @param total Read or written bytes. - * @param destPath Destination path. - * @param recursive Recursive flag. - * @param list Listed directories. - */ - Entry(int type, String path, IgfsMode mode, Long streamId, Integer bufSize, Long dataLen, Boolean append, - Boolean overwrite, Integer replication, Long blockSize, Long pos, Integer readLen, Long skipCnt, - Long readLimit, Long userTime, Long sysTime, Long total, String destPath, Boolean recursive, - String[] list) { - threadId = Thread.currentThread().getId(); - ts = U.currentTimeMillis(); - - this.type = type; - this.path = path; - this.mode = mode; - this.streamId = streamId != null ? streamId : -1; - this.bufSize = bufSize != null ? bufSize : -1; - this.dataLen = dataLen != null ? dataLen : -1; - this.append = append; - this.overwrite = overwrite; - this.replication = replication != null ? replication : -1; - this.blockSize = blockSize != null ? blockSize : -1; - this.pos = pos != null ? pos : -1; - this.readLen = readLen != null ? readLen : -1; - this.skipCnt = skipCnt != null ? skipCnt : -1; - this.readLimit = readLimit != null ? readLimit : -1; - this.userTime = userTime != null ? userTime : -1; - this.sysTime = sysTime != null ? sysTime : -1; - this.total = total != null ? total : -1; - this.destPath = destPath; - this.recursive = recursive; - this.list = list; - } - - /** - * Return suitable representation of long value. - * - * @param val Value. - * @return String representation. - */ - private String string(int val) { - return val != -1 ? String.valueOf(val) : ""; - } - - /** - * Return suitable representation of long value. - * - * @param val Value. - * @return String representation. - */ - private String string(long val) { - return val != -1 ? String.valueOf(val) : ""; - } - - /** - * Return suitable representation of the object. - * - * @param val Object. - * @return String representation. - */ - private String string(Object val) { - if (val == null) - return ""; - else if (val instanceof Boolean) - return ((Boolean) val) ? "1" : "0"; - else if (val instanceof String) - return ((String)val).replace(';', '~'); - else if (val instanceof String[]) { - String[] val0 = (String[])val; - - SB buf = new SB(); - - boolean first = true; - - for (String str : val0) { - if (first) - first = false; - else - buf.a(DELIM_FIELD_VAL); - - buf.a(str.replace(';', '~')); - } - - return buf.toString(); - } - else - return val.toString(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - SB res = new SB(); - - res.a(ts).a(DELIM_FIELD).a(threadId).a(DELIM_FIELD).a(pid).a(DELIM_FIELD).a(type).a(DELIM_FIELD) - .a(string(path)).a(DELIM_FIELD).a(string(mode)).a(DELIM_FIELD).a(string(streamId)).a(DELIM_FIELD) - .a(string(bufSize)).a(DELIM_FIELD).a(string(dataLen)).a(DELIM_FIELD).a(string(append)).a(DELIM_FIELD) - .a(string(overwrite)).a(DELIM_FIELD).a(string(replication)).a(DELIM_FIELD).a(string(blockSize)) - .a(DELIM_FIELD).a(string(pos)).a(DELIM_FIELD).a(string(readLen)).a(DELIM_FIELD).a(string(skipCnt)) - .a(DELIM_FIELD).a(string(readLimit)).a(DELIM_FIELD).a(string(userTime)).a(DELIM_FIELD) - .a(string(sysTime)).a(DELIM_FIELD).a(string(total)).a(DELIM_FIELD).a(string(destPath)).a(DELIM_FIELD) - .a(string(recursive)).a(DELIM_FIELD).a(string(list)); - - return res.toString(); - } - } - - /** - * Data flush worker. - */ - private class FlushWorker implements Runnable { - /** {@inheritDoc} */ - @Override public void run() { - Thread t = Thread.currentThread(); - - // We clear interrupted flag here in order to let the final flush proceed normally with IO operations. - while (!Thread.interrupted()) { - flushLock.lock(); - - try { - while (cnt.get() < batchSize && !t.isInterrupted()) { - try { - U.await(flushCond, 1000L, TimeUnit.MILLISECONDS); - } - catch (IgniteInterruptedCheckedException ignore) { - t.interrupt(); - - break; - } - } - } - finally { - flushLock.unlock(); - } - - if (!t.isInterrupted()) - flush(); - } - - // Flush remaining entries. - flush(); - } - - /** - * Flush buffered entries to disk. - */ - @SuppressWarnings("TooBroadScope") - private void flush() { - Collection<Entry> entries0; - - rwLock.writeLock().lock(); - - try { - entries0 = entries; - - entries = new ConcurrentLinkedDeque8<>(); - } - finally { - rwLock.writeLock().unlock(); - } - - // We could lost some increments here, but this is not critical if the new batch will exceed maximum - // size by several items. - cnt.set(0); - - if (!entries0.isEmpty()) { - boolean addHdr = !file.exists(); - - FileOutputStream fos = null; - OutputStreamWriter osw = null; - BufferedWriter bw = null; - - try { - fos = new FileOutputStream(file, true); - osw = new OutputStreamWriter(fos); - bw = new BufferedWriter(osw); - - if (addHdr) - bw.write(HDR + U.nl()); - - for (Entry entry : entries0) - bw.write(entry + U.nl()); - } - catch (IOException e) { - U.error(null, "Failed to flush logged entries to a disk due to an IO exception.", e); - } - finally { - U.closeQuiet(bw); - U.closeQuiet(osw); - U.closeQuiet(fos); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java deleted file mode 100644 index 7637f84..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*; - -/** - * Implementation of GGFS client message marshaller. - */ -public class IgfsMarshaller { - /** Packet header size. */ - public static final int HEADER_SIZE = 24; - - /** - * Creates new header with given request ID and command. - * - * @param reqId Request ID. - * @param cmd Command. - * @return Created header. - */ - public static byte[] createHeader(long reqId, IgfsIpcCommand cmd) { - assert cmd != null; - - byte[] hdr = new byte[HEADER_SIZE]; - - U.longToBytes(reqId, hdr, 0); - - U.intToBytes(cmd.ordinal(), hdr, 8); - - return hdr; - } - - /** - * Creates new header with given request ID and command. - * - * @param reqId Request ID. - * @param cmd Command. - * @return Created header. - */ - public static byte[] fillHeader(byte[] hdr, long reqId, IgfsIpcCommand cmd) { - assert cmd != null; - - Arrays.fill(hdr, (byte)0); - - U.longToBytes(reqId, hdr, 0); - - U.intToBytes(cmd.ordinal(), hdr, 8); - - return hdr; - } - - /** - * @param msg Message. - * @param hdr Message header. - * @param out Output. - * @throws IgniteCheckedException If failed. - */ - public void marshall(IgfsMessage msg, byte[] hdr, ObjectOutput out) throws IgniteCheckedException { - assert hdr != null; - assert hdr.length == HEADER_SIZE; - - try { - switch (msg.command()) { - case HANDSHAKE: { - out.write(hdr); - - IgfsHandshakeRequest req = (IgfsHandshakeRequest)msg; - - U.writeString(out, req.gridName()); - U.writeString(out, req.ggfsName()); - U.writeString(out, req.logDirectory()); - - break; - } - case STATUS: { - out.write(hdr); - - break; - } - - case EXISTS: - case INFO: - case PATH_SUMMARY: - case UPDATE: - case RENAME: - case DELETE: - case MAKE_DIRECTORIES: - case LIST_PATHS: - case LIST_FILES: - case AFFINITY: - case SET_TIMES: - case OPEN_READ: - case OPEN_APPEND: - case OPEN_CREATE: { - out.write(hdr); - - IgfsPathControlRequest req = (IgfsPathControlRequest)msg; - - writePath(out, req.path()); - writePath(out, req.destinationPath()); - out.writeBoolean(req.flag()); - out.writeBoolean(req.colocate()); - U.writeStringMap(out, req.properties()); - - // Minor optimization. - if (msg.command() == AFFINITY) { - out.writeLong(req.start()); - out.writeLong(req.length()); - } - else if (msg.command() == OPEN_CREATE) { - out.writeInt(req.replication()); - out.writeLong(req.blockSize()); - } - else if (msg.command() == SET_TIMES) { - out.writeLong(req.accessTime()); - out.writeLong(req.modificationTime()); - } - else if (msg.command() == OPEN_READ && req.flag()) - out.writeInt(req.sequentialReadsBeforePrefetch()); - - break; - } - - case CLOSE: - case READ_BLOCK: - case WRITE_BLOCK: { - assert msg.command() != WRITE_BLOCK : "WRITE_BLOCK should be marshalled manually."; - - IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; - - U.longToBytes(req.streamId(), hdr, 12); - - if (msg.command() == READ_BLOCK) - U.intToBytes(req.length(), hdr, 20); - - out.write(hdr); - - if (msg.command() == READ_BLOCK) - out.writeLong(req.position()); - - break; - } - - case CONTROL_RESPONSE: { - out.write(hdr); - - IgfsControlResponse res = (IgfsControlResponse)msg; - - res.writeExternal(out); - - break; - } - - default: { - assert false : "Invalid command: " + msg.command(); - - throw new IllegalArgumentException("Failed to marshal message (invalid command): " + - msg.command()); - } - } - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to GGFS data node (is data node up and running?)", e); - } - } - - /** - * @param cmd Command. - * @param hdr Header. - * @param in Input. - * @return Message. - * @throws IgniteCheckedException If failed. - */ - public IgfsMessage unmarshall(IgfsIpcCommand cmd, byte[] hdr, ObjectInput in) throws IgniteCheckedException { - assert hdr != null; - assert hdr.length == HEADER_SIZE; - - try { - IgfsMessage msg; - - switch (cmd) { - case HANDSHAKE: { - IgfsHandshakeRequest req = new IgfsHandshakeRequest(); - - req.gridName(U.readString(in)); - req.ggfsName(U.readString(in)); - req.logDirectory(U.readString(in)); - - msg = req; - - break; - } - - case STATUS: { - msg = new IgfsStatusRequest(); - - break; - } - - case EXISTS: - case INFO: - case PATH_SUMMARY: - case UPDATE: - case RENAME: - case DELETE: - case MAKE_DIRECTORIES: - case LIST_PATHS: - case LIST_FILES: - case SET_TIMES: - case AFFINITY: - case OPEN_READ: - case OPEN_APPEND: - case OPEN_CREATE: { - IgfsPathControlRequest req = new IgfsPathControlRequest(); - - req.path(readPath(in)); - req.destinationPath(readPath(in)); - req.flag(in.readBoolean()); - req.colocate(in.readBoolean()); - req.properties(U.readStringMap(in)); - - // Minor optimization. - if (cmd == AFFINITY) { - req.start(in.readLong()); - req.length(in.readLong()); - } - else if (cmd == OPEN_CREATE) { - req.replication(in.readInt()); - req.blockSize(in.readLong()); - } - else if (cmd == SET_TIMES) { - req.accessTime(in.readLong()); - req.modificationTime(in.readLong()); - } - else if (cmd == OPEN_READ && req.flag()) - req.sequentialReadsBeforePrefetch(in.readInt()); - - msg = req; - - break; - } - - case CLOSE: - case READ_BLOCK: - case WRITE_BLOCK: { - IgfsStreamControlRequest req = new IgfsStreamControlRequest(); - - long streamId = U.bytesToLong(hdr, 12); - - req.streamId(streamId); - req.length(U.bytesToInt(hdr, 20)); - - if (cmd == READ_BLOCK) - req.position(in.readLong()); - - msg = req; - - break; - } - - case CONTROL_RESPONSE: { - IgfsControlResponse res = new IgfsControlResponse(); - - res.readExternal(in); - - msg = res; - - break; - } - - default: { - assert false : "Invalid command: " + cmd; - - throw new IllegalArgumentException("Failed to unmarshal message (invalid command): " + cmd); - } - } - - assert msg != null; - - msg.command(cmd); - - return msg; - } - catch (IOException | ClassNotFoundException e) { - throw new IgniteCheckedException("Failed to unmarshal client message: " + cmd, e); - } - } - - /** - * Writes GGFS path to given data output. Can write {@code null} values. - * - * @param out Data output. - * @param path Path to write. - * @throws IOException If write failed. - */ - private void writePath(ObjectOutput out, @Nullable IgfsPath path) throws IOException { - out.writeBoolean(path != null); - - if (path != null) - path.writeExternal(out); - } - - /** - * Reads GGFS path from data input that was written by {@link #writePath(ObjectOutput, org.apache.ignite.igfs.IgfsPath)} - * method. - * - * @param in Data input. - * @return Written path or {@code null}. - */ - @Nullable private IgfsPath readPath(ObjectInput in) throws IOException { - if(in.readBoolean()) { - IgfsPath path = new IgfsPath(); - - path.readExternal(in); - - return path; - } - - return null; - } - - /** - * Writes string to output. - * - * @param out Data output. - * @param str String. - * @throws IOException If write failed. - */ - private void writeString(DataOutput out, @Nullable String str) throws IOException { - out.writeBoolean(str != null); - - if (str != null) - out.writeUTF(str); - } - - /** - * Reads string from input. - * - * @param in Data input. - * @return Read string. - * @throws IOException If read failed. - */ - @Nullable private String readString(DataInput in) throws IOException { - boolean hasStr = in.readBoolean(); - - if (hasStr) - return in.readUTF(); - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java deleted file mode 100644 index 8bd6666..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -/** - * Abstract class for all messages sent between GGFS client (Hadoop File System implementation) and - * GGFS server (Ignite data node). - */ -public abstract class IgfsMessage { - /** GGFS command. */ - private IgfsIpcCommand cmd; - - /** - * @return Command. - */ - public IgfsIpcCommand command() { - return cmd; - } - - /** - * @param cmd Command. - */ - public void command(IgfsIpcCommand cmd) { - this.cmd = cmd; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java deleted file mode 100644 index c24522c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Single path command message. This is a plain bean. - */ -public class IgfsPathControlRequest extends IgfsMessage { - /** Main path. */ - private IgfsPath path; - - /** Second path, rename command. */ - private IgfsPath destPath; - - /** Boolean flag, meaning depends on command. */ - private boolean flag; - - /** Boolean flag which controls whether file will be colocated on single node. */ - private boolean colocate; - - /** Properties. */ - private Map<String, String> props; - - /** Sequential reads before prefetch. */ - private int seqReadsBeforePrefetch; - - /** Start pos for affinity command. */ - private long start; - - /** Length for affinity code. */ - private long len; - - /** Hadoop replication factor. */ - private int replication; - - /** Hadoop block size. */ - private long blockSize; - - /** Last access time. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** - * @param path Path. - */ - public void path(IgfsPath path) { - this.path = path; - } - - /** - * @param destPath Destination path (rename only). - */ - public void destinationPath(IgfsPath destPath) { - this.destPath = destPath; - } - - /** - * @param flag Flag value. Meaning depends on command. - */ - public void flag(boolean flag) { - this.flag = flag; - } - - /** - * @param colocate Colocate control flag value. - */ - public void colocate(boolean colocate) { - this.colocate = colocate; - } - - /** - * @param replication Hadoop replication factor. - */ - public void replication(int replication) { - this.replication = replication; - } - - /** - * @param blockSize Hadoop block size. - */ - public void blockSize(long blockSize) { - this.blockSize = blockSize; - } - - /** - * @param props Properties map. - */ - public void properties(@Nullable Map<String, String> props) { - this.props = props; - } - - /** - * @param seqReadsBeforePrefetch Sequential reads before prefetch. - */ - public void sequentialReadsBeforePrefetch(int seqReadsBeforePrefetch) { - this.seqReadsBeforePrefetch = seqReadsBeforePrefetch; - } - - /** - * @param start Start position (affinity command only). - */ - public void start(long start) { - this.start = start; - } - - /** - * @param len Length (affinity command only). - */ - public void length(long len) { - this.len = len; - } - - /** - * @param accessTime Last access time. - */ - public void accessTime(long accessTime) { - this.accessTime = accessTime; - } - - /** - * @param modificationTime Last modification time. - */ - public void modificationTime(long modificationTime) { - this.modificationTime = modificationTime; - } - - /** - * @return Path. - */ - public IgfsPath path() { - return path; - } - - /** - * @return Destination path (rename only). - */ - public IgfsPath destinationPath() { - return destPath; - } - - /** - * @return Flag value (meaning depends on command). - */ - public boolean flag() { - return flag; - } - - /** - * @return Colocate control flag value. - */ - public boolean colocate() { - return colocate; - } - - /** - * @return Hadoop replication factor. - */ - public int replication() { - return replication; - } - - /** - * @return Hadoop block size. - */ - public long blockSize() { - return blockSize; - } - - /** - * @return Properties. - */ - public Map<String, String> properties() { - return props; - } - - /** - * @return Sequential reads before prefetch. - */ - public int sequentialReadsBeforePrefetch() { - return seqReadsBeforePrefetch; - } - - /** - * @return Start position (affinity command only). - */ - public long start() { - return start; - } - - /** - * @return Length (affinity command only). - */ - public long length() { - return len; - } - - /** - * @return Last access time. - */ - public long accessTime() { - return accessTime; - } - - /** - * @return Last modification time. - */ - public long modificationTime() { - return modificationTime; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java deleted file mode 100644 index 6f1c74a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*; - -/** - * GGFS status (total/used/free space) request. - */ -public class IgfsStatusRequest extends IgfsMessage { - /** {@inheritDoc} */ - @Override public IgfsIpcCommand command() { - return STATUS; - } - - /** {@inheritDoc} */ - @Override public void command(IgfsIpcCommand cmd) { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java deleted file mode 100644 index 1a61172..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.common; - -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Read block request. - */ -public class IgfsStreamControlRequest extends IgfsMessage { - /** Stream id. */ - private long streamId; - - /** Data. */ - @GridToStringExclude - private byte[] data; - - /** Read position. */ - private long pos; - - /** Length to read. */ - private int len; - - /** - * @return Stream ID. - */ - public long streamId() { - return streamId; - } - - /** - * @param streamId Stream ID. - */ - public void streamId(long streamId) { - this.streamId = streamId; - } - - /** - * @return Data. - */ - public byte[] data() { - return data; - } - - /** - * @param data Data. - */ - public void data(byte[] data) { - this.data = data; - } - - /** - * @return Position. - */ - public long position() { - return pos; - } - - /** - * @param pos Position. - */ - public void position(long pos) { - this.pos = pos; - } - - /** - * @return Length. - */ - public int length() { - return len; - } - - /** - * @param len Length. - */ - public void length(int len) { - this.len = len; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsStreamControlRequest.class, this, "cmd", command(), - "dataLen", data == null ? 0 : data.length); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/core/src/main/java/org/apache/ignite/internal/fs/common/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/package.html b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/package.html deleted file mode 100644 index ed9fac6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains GGFS classes that are common between client and server. -</body> -</html>
