# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cfcf46df Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cfcf46df Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cfcf46df Branch: refs/heads/ignite-63 Commit: cfcf46df629f35dbeac98ad15fca7fb848724af6 Parents: 1b0e45a Author: sboikov <[email protected]> Authored: Fri Jan 23 00:00:39 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Jan 23 00:00:48 2015 +0300 ---------------------------------------------------------------------- .../ggfs/GridCacheGgfsEvictionFilter.java | 1 - .../GridCacheGgfsPerBlockLruEvictionPolicy.java | 1 - .../fs/IgniteFsGroupDataBlocksKeyMapper.java | 1 - .../ignite/fs/mapreduce/IgniteFsTask.java | 1 - .../ignite/internal/GridKernalContext.java | 1 - .../ignite/internal/GridKernalContextImpl.java | 1 - .../fs/common/GridGgfsControlResponse.java | 1 - .../processors/fs/GridGgfsAckMessage.java | 212 ++ .../processors/fs/GridGgfsAsyncImpl.java | 297 ++ .../processors/fs/GridGgfsAttributes.java | 186 ++ .../processors/fs/GridGgfsBlockKey.java | 279 ++ .../fs/GridGgfsBlockLocationImpl.java | 258 ++ .../processors/fs/GridGgfsBlocksMessage.java | 257 ++ .../processors/fs/GridGgfsClientSession.java | 75 + .../fs/GridGgfsCommunicationMessage.java | 75 + .../internal/processors/fs/GridGgfsContext.java | 205 ++ .../processors/fs/GridGgfsDataManager.java | 1918 +++++++++++ .../processors/fs/GridGgfsDeleteMessage.java | 206 ++ .../processors/fs/GridGgfsDeleteWorker.java | 349 ++ .../fs/GridGgfsDirectoryNotEmptyException.java | 44 + .../internal/processors/fs/GridGgfsEx.java | 142 + .../fs/GridGgfsFileAffinityRange.java | 396 +++ .../processors/fs/GridGgfsFileInfo.java | 568 ++++ .../internal/processors/fs/GridGgfsFileMap.java | 359 +++ .../processors/fs/GridGgfsFileWorker.java | 182 ++ .../processors/fs/GridGgfsFileWorkerBatch.java | 235 ++ .../processors/fs/GridGgfsFileWorkerTask.java | 32 + .../fs/GridGgfsFragmentizerManager.java | 828 +++++ .../fs/GridGgfsFragmentizerRequest.java | 212 ++ .../fs/GridGgfsFragmentizerResponse.java | 131 + .../fs/GridGgfsHandshakeResponse.java | 122 + .../internal/processors/fs/GridGgfsHelper.java | 49 + .../processors/fs/GridGgfsHelperImpl.java | 54 + .../internal/processors/fs/GridGgfsImpl.java | 2151 +++++++++++++ .../fs/GridGgfsInputStreamAdapter.java | 49 + .../fs/GridGgfsInputStreamDescriptor.java | 78 + .../processors/fs/GridGgfsInputStreamImpl.java | 532 ++++ .../fs/GridGgfsInvalidRangeException.java | 43 + .../processors/fs/GridGgfsIpcHandler.java | 564 ++++ .../internal/processors/fs/GridGgfsJobImpl.java | 117 + .../processors/fs/GridGgfsListingEntry.java | 197 ++ .../processors/fs/GridGgfsLocalMetrics.java | 212 ++ .../internal/processors/fs/GridGgfsManager.java | 155 + .../processors/fs/GridGgfsMetaManager.java | 2991 +++++++++++++++++ .../processors/fs/GridGgfsModeResolver.java | 177 ++ .../internal/processors/fs/GridGgfsPaths.java | 124 + .../processors/fs/GridGgfsProcessor.java | 463 +++ .../processors/fs/GridGgfsProcessorAdapter.java | 80 + .../processors/fs/GridGgfsSamplingKey.java | 83 + .../GridGgfsSecondaryInputStreamDescriptor.java | 59 + ...GridGgfsSecondaryOutputStreamDescriptor.java | 74 + .../internal/processors/fs/GridGgfsServer.java | 427 +++ .../processors/fs/GridGgfsServerHandler.java | 57 + .../processors/fs/GridGgfsServerManager.java | 211 ++ .../internal/processors/fs/GridGgfsStatus.java | 76 + .../processors/fs/GridGgfsSyncMessage.java | 161 + .../internal/processors/fs/GridGgfsThread.java | 82 + .../processors/fs/GridNoopGgfsHelper.java | 41 + .../processors/fs/GridNoopGgfsProcessor.java | 71 + .../processors/fs/IgniteFsFileImpl.java | 245 ++ .../processors/fs/IgniteFsMetricsAdapter.java | 239 ++ .../fs/IgniteFsOutputStreamAdapter.java | 263 ++ .../processors/fs/IgniteFsOutputStreamImpl.java | 504 +++ .../processors/fs/IgniteFsTaskArgsImpl.java | 135 + .../ignite/internal/processors/fs/package.html | 23 + .../visor/ggfs/VisorGgfsSamplingStateTask.java | 1 - .../visor/node/VisorNodeDataCollectorJob.java | 1 - .../internal/visor/util/VisorTaskUtils.java | 1 - .../processors/ggfs/GridGgfsAckMessage.java | 212 -- .../processors/ggfs/GridGgfsAsyncImpl.java | 298 -- .../processors/ggfs/GridGgfsAttributes.java | 186 -- .../processors/ggfs/GridGgfsBlockKey.java | 279 -- .../ggfs/GridGgfsBlockLocationImpl.java | 258 -- .../processors/ggfs/GridGgfsBlocksMessage.java | 257 -- .../processors/ggfs/GridGgfsClientSession.java | 75 - .../ggfs/GridGgfsCommunicationMessage.java | 75 - .../kernal/processors/ggfs/GridGgfsContext.java | 205 -- .../processors/ggfs/GridGgfsDataManager.java | 1918 ----------- .../processors/ggfs/GridGgfsDeleteMessage.java | 206 -- .../processors/ggfs/GridGgfsDeleteWorker.java | 350 -- .../GridGgfsDirectoryNotEmptyException.java | 44 - .../grid/kernal/processors/ggfs/GridGgfsEx.java | 143 - .../ggfs/GridGgfsFileAffinityRange.java | 396 --- .../processors/ggfs/GridGgfsFileInfo.java | 568 ---- .../kernal/processors/ggfs/GridGgfsFileMap.java | 361 --- .../processors/ggfs/GridGgfsFileWorker.java | 182 -- .../ggfs/GridGgfsFileWorkerBatch.java | 235 -- .../processors/ggfs/GridGgfsFileWorkerTask.java | 32 - .../ggfs/GridGgfsFragmentizerManager.java | 829 ----- .../ggfs/GridGgfsFragmentizerRequest.java | 212 -- .../ggfs/GridGgfsFragmentizerResponse.java | 131 - .../ggfs/GridGgfsHandshakeResponse.java | 122 - .../kernal/processors/ggfs/GridGgfsHelper.java | 49 - .../processors/ggfs/GridGgfsHelperImpl.java | 54 - .../kernal/processors/ggfs/GridGgfsImpl.java | 2152 ------------- .../ggfs/GridGgfsInputStreamAdapter.java | 49 - .../ggfs/GridGgfsInputStreamDescriptor.java | 78 - .../ggfs/GridGgfsInputStreamImpl.java | 532 ---- .../ggfs/GridGgfsInvalidRangeException.java | 43 - .../processors/ggfs/GridGgfsIpcHandler.java | 564 ---- .../kernal/processors/ggfs/GridGgfsJobImpl.java | 117 - .../processors/ggfs/GridGgfsListingEntry.java | 197 -- .../processors/ggfs/GridGgfsLocalMetrics.java | 212 -- .../kernal/processors/ggfs/GridGgfsManager.java | 156 - .../processors/ggfs/GridGgfsMetaManager.java | 2992 ------------------ .../processors/ggfs/GridGgfsModeResolver.java | 177 -- .../kernal/processors/ggfs/GridGgfsPaths.java | 124 - .../processors/ggfs/GridGgfsProcessor.java | 463 --- .../ggfs/GridGgfsProcessorAdapter.java | 80 - .../processors/ggfs/GridGgfsSamplingKey.java | 83 - .../GridGgfsSecondaryInputStreamDescriptor.java | 59 - ...GridGgfsSecondaryOutputStreamDescriptor.java | 74 - .../kernal/processors/ggfs/GridGgfsServer.java | 427 --- .../processors/ggfs/GridGgfsServerHandler.java | 57 - .../processors/ggfs/GridGgfsServerManager.java | 211 -- .../kernal/processors/ggfs/GridGgfsStatus.java | 76 - .../processors/ggfs/GridGgfsSyncMessage.java | 161 - .../kernal/processors/ggfs/GridGgfsThread.java | 82 - .../processors/ggfs/GridNoopGgfsHelper.java | 41 - .../processors/ggfs/GridNoopGgfsProcessor.java | 71 - .../processors/ggfs/IgniteFsFileImpl.java | 245 -- .../processors/ggfs/IgniteFsMetricsAdapter.java | 239 -- .../ggfs/IgniteFsOutputStreamAdapter.java | 263 -- .../ggfs/IgniteFsOutputStreamImpl.java | 504 --- .../processors/ggfs/IgniteFsTaskArgsImpl.java | 135 - .../grid/kernal/processors/ggfs/package.html | 23 - ...heGgfsPerBlockLruEvictionPolicySelfTest.java | 489 +++ .../processors/fs/GridGgfsAbstractSelfTest.java | 2453 ++++++++++++++ .../fs/GridGgfsAttributesSelfTest.java | 75 + .../processors/fs/GridGgfsCacheSelfTest.java | 133 + .../fs/GridGgfsCommonAbstractTest.java | 67 + .../fs/GridGgfsDataManagerSelfTest.java | 599 ++++ .../fs/GridGgfsDualAbstractSelfTest.java | 1601 ++++++++++ .../fs/GridGgfsDualAsyncSelfTest.java | 32 + .../processors/fs/GridGgfsDualSyncSelfTest.java | 32 + .../processors/fs/GridGgfsFileInfoSelfTest.java | 88 + .../processors/fs/GridGgfsFileMapSelfTest.java | 335 ++ ...GgfsGroupDataBlockKeyMapperHashSelfTest.java | 136 + .../fs/GridGgfsMetaManagerSelfTest.java | 466 +++ .../processors/fs/GridGgfsMetricsSelfTest.java | 536 ++++ .../fs/GridGgfsModeResolverSelfTest.java | 77 + .../processors/fs/GridGgfsModesSelfTest.java | 604 ++++ .../GridGgfsPrimaryOffheapTieredSelfTest.java | 33 + .../GridGgfsPrimaryOffheapValuesSelfTest.java | 33 + .../processors/fs/GridGgfsPrimarySelfTest.java | 32 + .../fs/GridGgfsProcessorSelfTest.java | 977 ++++++ .../fs/GridGgfsProcessorValidationSelfTest.java | 535 ++++ ...IpcEndpointRegistrationAbstractSelfTest.java | 185 ++ ...dpointRegistrationOnLinuxAndMacSelfTest.java | 50 + ...pcEndpointRegistrationOnWindowsSelfTest.java | 52 + .../processors/fs/GridGgfsSizeSelfTest.java | 874 +++++ .../processors/fs/GridGgfsStreamsSelfTest.java | 472 +++ .../processors/fs/GridGgfsTaskSelfTest.java | 316 ++ .../processors/fs/GridGgfsTestInputStream.java | 66 + .../ignite/internal/processors/fs/package.html | 23 + .../GridGgfsAbstractRecordResolverSelfTest.java | 187 ++ ...GgfsByteDelimiterRecordResolverSelfTest.java | 335 ++ ...idGgfsFixedLengthRecordResolverSelfTest.java | 147 + ...sNewLineDelimiterRecordResolverSelfTest.java | 129 + ...fsStringDelimiterRecordResolverSelfTest.java | 137 + ...idIpcServerEndpointDeserializerSelfTest.java | 1 - .../ggfs/GridGgfsEventsAbstractSelfTest.java | 1 - .../GridGgfsFragmentizerAbstractSelfTest.java | 1 - .../grid/ggfs/GridGgfsFragmentizerSelfTest.java | 1 - ...heGgfsPerBlockLruEvictionPolicySelfTest.java | 489 --- .../ggfs/GridGgfsAbstractSelfTest.java | 2453 -------------- .../ggfs/GridGgfsAttributesSelfTest.java | 75 - .../processors/ggfs/GridGgfsCacheSelfTest.java | 134 - .../ggfs/GridGgfsCommonAbstractTest.java | 67 - .../ggfs/GridGgfsDataManagerSelfTest.java | 599 ---- .../ggfs/GridGgfsDualAbstractSelfTest.java | 1601 ---------- .../ggfs/GridGgfsDualAsyncSelfTest.java | 32 - .../ggfs/GridGgfsDualSyncSelfTest.java | 32 - .../ggfs/GridGgfsFileInfoSelfTest.java | 88 - .../ggfs/GridGgfsFileMapSelfTest.java | 337 -- ...GgfsGroupDataBlockKeyMapperHashSelfTest.java | 136 - .../ggfs/GridGgfsMetaManagerSelfTest.java | 468 --- .../ggfs/GridGgfsMetricsSelfTest.java | 536 ---- .../ggfs/GridGgfsModeResolverSelfTest.java | 77 - .../processors/ggfs/GridGgfsModesSelfTest.java | 604 ---- .../GridGgfsPrimaryOffheapTieredSelfTest.java | 33 - .../GridGgfsPrimaryOffheapValuesSelfTest.java | 33 - .../ggfs/GridGgfsPrimarySelfTest.java | 32 - .../ggfs/GridGgfsProcessorSelfTest.java | 977 ------ .../GridGgfsProcessorValidationSelfTest.java | 535 ---- ...IpcEndpointRegistrationAbstractSelfTest.java | 185 -- ...dpointRegistrationOnLinuxAndMacSelfTest.java | 50 - ...pcEndpointRegistrationOnWindowsSelfTest.java | 54 - .../processors/ggfs/GridGgfsSizeSelfTest.java | 875 ----- .../ggfs/GridGgfsStreamsSelfTest.java | 472 --- .../processors/ggfs/GridGgfsTaskSelfTest.java | 316 -- .../ggfs/GridGgfsTestInputStream.java | 66 - .../grid/kernal/processors/ggfs/package.html | 23 - .../GridGgfsAbstractRecordResolverSelfTest.java | 187 -- ...GgfsByteDelimiterRecordResolverSelfTest.java | 335 -- ...idGgfsFixedLengthRecordResolverSelfTest.java | 147 - ...sNewLineDelimiterRecordResolverSelfTest.java | 129 - ...fsStringDelimiterRecordResolverSelfTest.java | 137 - .../testsuites/bamboo/GridGgfsTestSuite.java | 2 - .../GridHadoopDefaultMapReducePlanner.java | 1 - .../hadoop/v1/GridGgfsHadoopFileSystem.java | 1 - .../hadoop/v2/GridGgfsHadoopFileSystem.java | 1 - .../grid/kernal/ggfs/hadoop/GridGgfsHadoop.java | 2 - .../hadoop/GridGgfsHadoopFileSystemWrapper.java | 1 - .../ggfs/hadoop/GridGgfsHadoopInProc.java | 1 - .../ggfs/hadoop/GridGgfsHadoopOutProc.java | 1 - .../kernal/ggfs/hadoop/GridGgfsHadoopUtils.java | 2 - .../ggfs/hadoop/GridGgfsHadoopWrapper.java | 1 - .../hadoop/GridHadoopAbstractWordCountTest.java | 1 - .../hadoop/GridHadoopCommandLineTest.java | 1 - ...idHadoopDefaultMapReducePlannerSelfTest.java | 1 - ...dGgfsHadoop20FileSystemAbstractSelfTest.java | 1 - .../GridGgfsHadoopDualAbstractSelfTest.java | 2 - ...ridGgfsHadoopFileSystemAbstractSelfTest.java | 1 - .../GridGgfsHadoopFileSystemClientSelfTest.java | 1 - ...idGgfsHadoopFileSystemHandshakeSelfTest.java | 1 - ...ridGgfsHadoopFileSystemIpcCacheSelfTest.java | 1 - .../GridGgfsHadoopFileSystemLoggerSelfTest.java | 1 - ...GgfsHadoopFileSystemLoggerStateSelfTest.java | 1 - ...fsHadoopFileSystemSecondaryModeSelfTest.java | 1 - .../bamboo/GridGgfsLinuxAndMacOSTestSuite.java | 1 - 221 files changed, 30661 insertions(+), 30719 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsEvictionFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsEvictionFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsEvictionFilter.java index 0dd98b7..114b757 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsEvictionFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsEvictionFilter.java @@ -19,7 +19,6 @@ package org.apache.ignite.cache.eviction.ggfs; import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.*; -import org.gridgain.grid.kernal.processors.ggfs.*; /** * GGFS eviction filter which will not evict blocks of particular files. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java index 8662dc5..856ef4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.fs.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.jdk8.backport.*; import org.jdk8.backport.ConcurrentLinkedDeque8.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsGroupDataBlocksKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsGroupDataBlocksKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsGroupDataBlocksKeyMapper.java index c7f3bfc..55c0fc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsGroupDataBlocksKeyMapper.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsGroupDataBlocksKeyMapper.java @@ -18,7 +18,6 @@ package org.apache.ignite.fs; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.util.typedef.internal.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java index c84cdd4..3cb794b 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java @@ -23,7 +23,6 @@ import org.apache.ignite.compute.*; import org.apache.ignite.fs.*; import org.apache.ignite.internal.*; import org.apache.ignite.resources.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index abee238..aa5b24c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -40,7 +40,6 @@ import org.gridgain.grid.kernal.processors.closure.*; import org.gridgain.grid.kernal.processors.continuous.*; import org.gridgain.grid.kernal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.processors.hadoop.*; import org.gridgain.grid.kernal.processors.interop.*; import org.apache.ignite.internal.processors.job.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index a7986d2..ea3ba95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -42,7 +42,6 @@ import org.gridgain.grid.kernal.processors.closure.*; import org.gridgain.grid.kernal.processors.continuous.*; import org.gridgain.grid.kernal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.processors.hadoop.*; import org.gridgain.grid.kernal.processors.interop.*; import org.apache.ignite.internal.processors.job.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/fs/common/GridGgfsControlResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/GridGgfsControlResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/GridGgfsControlResponse.java index 3939064..bebbb03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/GridGgfsControlResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/GridGgfsControlResponse.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.fs.common; import org.apache.ignite.*; import org.apache.ignite.fs.*; -import org.gridgain.grid.kernal.processors.ggfs.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java new file mode 100644 index 0000000..eabc2b1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.internal.util.direct.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; + +/** + * Block write request acknowledgement message. + */ +public class GridGgfsAckMessage extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Request ID to ack. */ + private long id; + + /** Write exception. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** */ + private byte[] errBytes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsAckMessage() { + // No-op. + } + + /** + * @param fileId File ID. + * @param id Request ID. + * @param err Error. + */ + public GridGgfsAckMessage(IgniteUuid fileId, long id, @Nullable IgniteCheckedException err) { + this.fileId = fileId; + this.id = id; + this.err = err; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Batch ID. + */ + public long id() { + return id; + } + + /** + * @return Error occurred when writing this batch, if any. + */ + public IgniteCheckedException error() { + return err; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + super.prepareMarshal(marsh); + + if (err != null) + errBytes = marsh.marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(marsh, ldr); + + if (errBytes != null) + err = marsh.unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsAckMessage _clone = new GridGgfsAckMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsAckMessage _clone = (GridGgfsAckMessage)_msg; + + _clone.fileId = fileId; + _clone.id = id; + _clone.err = err; + _clone.errBytes = errBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putByteArray(errBytes)) + return false; + + commState.idx++; + + case 1: + if (!commState.putGridUuid(fileId)) + return false; + + commState.idx++; + + case 2: + if (!commState.putLong(id)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: + byte[] errBytes0 = commState.getByteArray(); + + if (errBytes0 == BYTE_ARR_NOT_READ) + return false; + + errBytes = errBytes0; + + commState.idx++; + + case 1: + IgniteUuid fileId0 = commState.getGridUuid(); + + if (fileId0 == GRID_UUID_NOT_READ) + return false; + + fileId = fileId0; + + commState.idx++; + + case 2: + if (buf.remaining() < 8) + return false; + + id = commState.getLong(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 65; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAsyncImpl.java new file mode 100644 index 0000000..80154c0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAsyncImpl.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; + +/** + * Ggfs supporting asynchronous operations. + */ +public class GridGgfsAsyncImpl extends IgniteAsyncSupportAdapter implements GridGgfsEx { + /** */ + private final GridGgfsImpl ggfs; + + /** + * @param ggfs Ggfs. + */ + public GridGgfsAsyncImpl(GridGgfsImpl ggfs) { + super(true); + + this.ggfs = ggfs; + } + + /** {@inheritDoc} */ + @Override public IgniteFs enableAsync() { + return this; + } + + /** {@inheritDoc} */ + @Override public void format() throws IgniteCheckedException { + saveOrGet(ggfs.formatAsync()); + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, + Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteCheckedException { + return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg)); + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, + Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) + throws IgniteCheckedException { + return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteCheckedException { + return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg)); + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) throws IgniteCheckedException { + return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + + /** {@inheritDoc} */ + @Override public void stop() { + ggfs.stop(); + } + + /** {@inheritDoc} */ + @Override public GridGgfsContext context() { + return ggfs.context(); + } + + /** {@inheritDoc} */ + @Override public GridGgfsPaths proxyPaths() { + return ggfs.proxyPaths(); + } + + /** {@inheritDoc} */ + @Override public GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize, + int seqReadsBeforePrefetch) throws IgniteCheckedException { + return ggfs.open(path, bufSize, seqReadsBeforePrefetch); + } + + /** {@inheritDoc} */ + @Override public GridGgfsInputStreamAdapter open(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.open(path); + } + + /** {@inheritDoc} */ + @Override public GridGgfsInputStreamAdapter open(IgniteFsPath path, int bufSize) throws IgniteCheckedException { + return ggfs.open(path, bufSize); + } + + /** {@inheritDoc} */ + @Override public GridGgfsStatus globalSpace() throws IgniteCheckedException { + return ggfs.globalSpace(); + } + + /** {@inheritDoc} */ + @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { + ggfs.globalSampling(val); + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean globalSampling() { + return ggfs.globalSampling(); + } + + /** {@inheritDoc} */ + @Override public GridGgfsLocalMetrics localMetrics() { + return ggfs.localMetrics(); + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return ggfs.groupBlockSize(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> awaitDeletesAsync() throws IgniteCheckedException { + return ggfs.awaitDeletesAsync(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String clientLogDirectory() { + return ggfs.clientLogDirectory(); + } + + /** {@inheritDoc} */ + @Override public void clientLogDirectory(String logDir) { + ggfs.clientLogDirectory(logDir); + } + + /** {@inheritDoc} */ + @Override public boolean evictExclude(IgniteFsPath path, boolean primary) { + return ggfs.evictExclude(path, primary); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid nextAffinityKey() { + return ggfs.nextAffinityKey(); + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return ggfs.isProxy(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public String name() { + return ggfs.name(); + } + + /** {@inheritDoc} */ + @Override public IgniteFsConfiguration configuration() { + return ggfs.configuration(); + } + + /** {@inheritDoc} */ + @Override public IgniteFsPathSummary summary(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.summary(path); + } + + /** {@inheritDoc} */ + @Override public IgniteFsOutputStream create(IgniteFsPath path, boolean overwrite) throws IgniteCheckedException { + return ggfs.create(path, overwrite); + } + + /** {@inheritDoc} */ + @Override public IgniteFsOutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + return ggfs.create(path, bufSize, overwrite, replication, blockSize, props); + } + + /** {@inheritDoc} */ + @Override public IgniteFsOutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, + @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) + throws IgniteCheckedException { + return ggfs.create(path, bufSize, overwrite, affKey, replication, blockSize, props); + } + + /** {@inheritDoc} */ + @Override public IgniteFsOutputStream append(IgniteFsPath path, boolean create) throws IgniteCheckedException { + return ggfs.append(path, create); + } + + /** {@inheritDoc} */ + @Override public IgniteFsOutputStream append(IgniteFsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException { + return ggfs.append(path, bufSize, create, props); + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgniteFsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + ggfs.setTimes(path, accessTime, modificationTime); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len) + throws IgniteCheckedException { + return ggfs.affinity(path, start, len); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len, long maxLen) + throws IgniteCheckedException { + return ggfs.affinity(path, start, len, maxLen); + } + + /** {@inheritDoc} */ + @Override public IgniteFsMetrics metrics() throws IgniteCheckedException { + return ggfs.metrics(); + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() throws IgniteCheckedException { + ggfs.resetMetrics(); + } + + /** {@inheritDoc} */ + @Override public long size(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.size(path); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.exists(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) throws IgniteCheckedException { + return ggfs.update(path, props); + } + + /** {@inheritDoc} */ + @Override public void rename(IgniteFsPath src, IgniteFsPath dest) throws IgniteCheckedException { + ggfs.rename(src, dest); + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgniteFsPath path, boolean recursive) throws IgniteCheckedException { + return ggfs.delete(path, recursive); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgniteFsPath path) throws IgniteCheckedException { + ggfs.mkdirs(path); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgniteFsPath path, @Nullable Map<String, String> props) throws IgniteCheckedException { + ggfs.mkdirs(path, props); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFsPath> listPaths(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.listPaths(path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFsFile> listFiles(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.listFiles(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFsFile info(IgniteFsPath path) throws IgniteCheckedException { + return ggfs.info(path); + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() throws IgniteCheckedException { + return ggfs.usedSpaceSize(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return ggfs.properties(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAttributes.java new file mode 100644 index 0000000..ba3dc09 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAttributes.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * GGFS attributes. + * <p> + * This class contains information on a single GGFS configured on some node. + */ +public class GridGgfsAttributes implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** File's data block size (bytes). */ + private int blockSize; + + /** Size of the group figured in {@link org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper}. */ + private int grpSize; + + /** Meta cache name. */ + private String metaCacheName; + + /** Data cache name. */ + private String dataCacheName; + + /** Default mode. */ + private IgniteFsMode dfltMode; + + /** Fragmentizer enabled flag. */ + private boolean fragmentizerEnabled; + + /** Path modes. */ + private Map<String, IgniteFsMode> pathModes; + + /** + * @param ggfsName GGFS name. + * @param blockSize File's data block size (bytes). + * @param grpSize Size of the group figured in {@link org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper}. + * @param metaCacheName Meta cache name. + * @param dataCacheName Data cache name. + * @param dfltMode Default mode. + * @param pathModes Path modes. + */ + public GridGgfsAttributes(String ggfsName, int blockSize, int grpSize, String metaCacheName, String dataCacheName, + IgniteFsMode dfltMode, Map<String, IgniteFsMode> pathModes, boolean fragmentizerEnabled) { + this.blockSize = blockSize; + this.ggfsName = ggfsName; + this.grpSize = grpSize; + this.metaCacheName = metaCacheName; + this.dataCacheName = dataCacheName; + this.dfltMode = dfltMode; + this.pathModes = pathModes; + this.fragmentizerEnabled = fragmentizerEnabled; + } + + /** + * Public no-arg constructor for {@link Externalizable}. + */ + public GridGgfsAttributes() { + // No-op. + } + + /** + * @return GGFS name. + */ + public String ggfsName() { + return ggfsName; + } + + /** + * @return File's data block size (bytes). + */ + public int blockSize() { + return blockSize; + } + + /** + * @return Size of the group figured in {@link org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper}. + */ + public int groupSize() { + return grpSize; + } + + /** + * @return Metadata cache name. + */ + public String metaCacheName() { + return metaCacheName; + } + + /** + * @return Data cache name. + */ + public String dataCacheName() { + return dataCacheName; + } + + /** + * @return Default mode. + */ + public IgniteFsMode defaultMode() { + return dfltMode; + } + + /** + * @return Path modes. + */ + public Map<String, IgniteFsMode> pathModes() { + return pathModes != null ? Collections.unmodifiableMap(pathModes) : null; + } + + /** + * @return {@code True} if fragmentizer is enabled. + */ + public boolean fragmentizerEnabled() { + return fragmentizerEnabled; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ggfsName); + out.writeInt(blockSize); + out.writeInt(grpSize); + U.writeString(out, metaCacheName); + U.writeString(out, dataCacheName); + U.writeEnum0(out, dfltMode); + out.writeBoolean(fragmentizerEnabled); + + if (pathModes != null) { + out.writeBoolean(true); + + out.writeInt(pathModes.size()); + + for (Map.Entry<String, IgniteFsMode> pathMode : pathModes.entrySet()) { + U.writeString(out, pathMode.getKey()); + U.writeEnum0(out, pathMode.getValue()); + } + } + else + out.writeBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ggfsName = U.readString(in); + blockSize = in.readInt(); + grpSize = in.readInt(); + metaCacheName = U.readString(in); + dataCacheName = U.readString(in); + dfltMode = IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in)); + fragmentizerEnabled = in.readBoolean(); + + if (in.readBoolean()) { + int size = in.readInt(); + + pathModes = new HashMap<>(size, 1.0f); + + for (int i = 0; i < size; i++) + pathModes.put(U.readString(in), IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java new file mode 100644 index 0000000..84fcaa6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; + +/** + * File's binary data block key. + */ +@GridInternal +public final class GridGgfsBlockKey extends GridTcpCommunicationMessageAdapter + implements Externalizable, Comparable<GridGgfsBlockKey> { + /** */ + private static final long serialVersionUID = 0L; + + /** File system file ID. */ + private IgniteUuid fileId; + + /** Block ID. */ + private long blockId; + + /** Block affinity key. */ + private IgniteUuid affKey; + + /** Eviction exclude flag. */ + private boolean evictExclude; + + /** + * Constructs file's binary data block key. + * + * @param fileId File ID. + * @param affKey Affinity key. + * @param evictExclude Evict exclude flag. + * @param blockId Block ID. + */ + public GridGgfsBlockKey(IgniteUuid fileId, @Nullable IgniteUuid affKey, boolean evictExclude, long blockId) { + assert fileId != null; + assert blockId >= 0; + + this.fileId = fileId; + this.affKey = affKey; + this.evictExclude = evictExclude; + this.blockId = blockId; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridGgfsBlockKey() { + // No-op. + } + + /** + * @return File ID. + */ + public IgniteUuid getFileId() { + return fileId; + } + + /** + * @return Block affinity key. + */ + public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @return Evict exclude flag. + */ + public boolean evictExclude() { + return evictExclude; + } + + /** + * @return Block ID. + */ + public long getBlockId() { + return blockId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GridGgfsBlockKey o) { + int res = fileId.compareTo(o.fileId); + + if (res != 0) + return res; + + long v1 = blockId; + long v2 = o.blockId; + + if (v1 != v2) + return v1 > v2 ? 1 : -1; + + if (affKey == null && o.affKey == null) + return 0; + + if (affKey != null && o.affKey != null) + return affKey.compareTo(o.affKey); + + return affKey != null ? -1 : 1; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, fileId); + U.writeGridUuid(out, affKey); + out.writeBoolean(evictExclude); + out.writeLong(blockId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + fileId = U.readGridUuid(in); + affKey = U.readGridUuid(in); + evictExclude = in.readBoolean(); + blockId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return fileId.hashCode() + (int)(blockId ^ (blockId >>> 32)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || o.getClass() != getClass()) + return false; + + GridGgfsBlockKey that = (GridGgfsBlockKey)o; + + return blockId == that.blockId && fileId.equals(that.fileId) && F.eq(affKey, that.affKey) && + evictExclude == that.evictExclude; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsBlockKey _clone = new GridGgfsBlockKey(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridGgfsBlockKey _clone = (GridGgfsBlockKey)_msg; + + _clone.fileId = fileId; + _clone.blockId = blockId; + _clone.affKey = affKey; + _clone.evictExclude = evictExclude; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putGridUuid(affKey)) + return false; + + commState.idx++; + + case 1: + if (!commState.putLong(blockId)) + return false; + + commState.idx++; + + case 2: + if (!commState.putBoolean(evictExclude)) + return false; + + commState.idx++; + + case 3: + if (!commState.putGridUuid(fileId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + IgniteUuid affKey0 = commState.getGridUuid(); + + if (affKey0 == GRID_UUID_NOT_READ) + return false; + + affKey = affKey0; + + commState.idx++; + + case 1: + if (buf.remaining() < 8) + return false; + + blockId = commState.getLong(); + + commState.idx++; + + case 2: + if (buf.remaining() < 1) + return false; + + evictExclude = commState.getBoolean(); + + commState.idx++; + + case 3: + IgniteUuid fileId0 = commState.getGridUuid(); + + if (fileId0 == GRID_UUID_NOT_READ) + return false; + + fileId = fileId0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 66; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsBlockKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockLocationImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockLocationImpl.java new file mode 100644 index 0000000..2a5fea3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockLocationImpl.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * File block location in the grid. + */ +public class GridGgfsBlockLocationImpl implements IgniteFsBlockLocation, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long start; + + /** */ + private long len; + + /** */ + @GridToStringInclude + private Collection<UUID> nodeIds; + + /** */ + private Collection<String> names; + + /** */ + private Collection<String> hosts; + + /** + * Empty constructor for externalizable. + */ + public GridGgfsBlockLocationImpl() { + // No-op. + } + + /** + * @param location HDFS block location. + * @param len New length. + */ + public GridGgfsBlockLocationImpl(IgniteFsBlockLocation location, long len) { + assert location != null; + + start = location.start(); + this.len = len; + + nodeIds = location.nodeIds(); + names = location.names(); + hosts = location.hosts(); + } + + /** + * @param start Start. + * @param len Length. + * @param nodes Affinity nodes. + */ + public GridGgfsBlockLocationImpl(long start, long len, Collection<ClusterNode> nodes) { + assert start >= 0; + assert len > 0; + assert nodes != null && !nodes.isEmpty(); + + this.start = start; + this.len = len; + + convertFromNodes(nodes); + } + + /** + * @return Start position. + */ + @Override public long start() { + return start; + } + + /** + * @return Length. + */ + @Override public long length() { + return len; + } + + /** + * @return Node IDs. + */ + @Override public Collection<UUID> nodeIds() { + return nodeIds; + } + + /** {@inheritDoc} */ + @Override public Collection<String> names() { + return names; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hosts() { + return hosts; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (int)(start ^ (start >>> 32)); + + res = 31 * res + (int)(len ^ (len >>> 32)); + res = 31 * res + nodeIds.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridGgfsBlockLocationImpl that = (GridGgfsBlockLocationImpl)o; + + return len == that.len && start == that.start && F.eq(nodeIds, that.nodeIds) && F.eq(names, that.names) && + F.eq(hosts, that.hosts); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsBlockLocationImpl.class, this); + } + + /** + * Writes this object to data output. Note that this is not externalizable + * interface because we want to eliminate any marshaller. + * + * @param out Data output to write. + * @throws IOException If write failed. + */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + assert names != null; + assert hosts != null; + + out.writeLong(start); + out.writeLong(len); + + out.writeBoolean(nodeIds != null); + + if (nodeIds != null) { + out.writeInt(nodeIds.size()); + + for (UUID nodeId : nodeIds) + U.writeUuid(out, nodeId); + } + + out.writeInt(names.size()); + + for (String name : names) + out.writeUTF(name); + + out.writeInt(hosts.size()); + + for (String host : hosts) + out.writeUTF(host); + } + + /** + * Reads object from data input. Note we do not use externalizable interface + * to eliminate marshaller. + * + * @param in Data input. + * @throws IOException If read failed. + */ + @Override public void readExternal(ObjectInput in) throws IOException { + start = in.readLong(); + len = in.readLong(); + + int size; + + if (in.readBoolean()) { + size = in.readInt(); + + nodeIds = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + nodeIds.add(U.readUuid(in)); + } + + size = in.readInt(); + + names = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + names.add(in.readUTF()); + + size = in.readInt(); + + hosts = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + hosts.add(in.readUTF()); + } + + /** + * Converts collection of rich nodes to block location data. + * + * @param nodes Collection of affinity nodes. + */ + private void convertFromNodes(Collection<ClusterNode> nodes) { + Collection<String> names = new LinkedHashSet<>(); + Collection<String> hosts = new LinkedHashSet<>(); + Collection<UUID> nodeIds = new ArrayList<>(nodes.size()); + + for (final ClusterNode node : nodes) { + // Normalize host names into Hadoop-expected format. + try { + Collection<InetAddress> addrs = U.toInetAddresses(node); + + for (InetAddress addr : addrs) { + if (addr.getHostName() == null) + names.add(addr.getHostAddress() + ":" + 9001); + else { + names.add(addr.getHostName() + ":" + 9001); // hostname:portNumber + hosts.add(addr.getHostName()); + } + } + } + catch (IgniteCheckedException ignored) { + names.addAll(node.addresses()); + } + + nodeIds.add(node.id()); + } + + this.nodeIds = nodeIds; + this.names = names; + this.hosts = hosts; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java new file mode 100644 index 0000000..2d90e86 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * GGFS write blocks message. + */ +public class GridGgfsBlocksMessage extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Batch id */ + private long id; + + /** Blocks to store. */ + @GridDirectMap(keyType = GridGgfsBlockKey.class, valueType = byte[].class) + private Map<GridGgfsBlockKey, byte[]> blocks; + + /** + * Empty constructor required by {@link Externalizable} + */ + public GridGgfsBlocksMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param fileId File ID. + * @param id Message id. + * @param blocks Blocks to put in cache. + */ + public GridGgfsBlocksMessage(IgniteUuid fileId, long id, Map<GridGgfsBlockKey, byte[]> blocks) { + this.fileId = fileId; + this.id = id; + this.blocks = blocks; + } + + /** + * @return File id. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Batch id. + */ + public long id() { + return id; + } + + /** + * @return Map of blocks to put in cache. + */ + public Map<GridGgfsBlockKey, byte[]> blocks() { + return blocks; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsBlocksMessage _clone = new GridGgfsBlocksMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsBlocksMessage _clone = (GridGgfsBlocksMessage)_msg; + + _clone.fileId = fileId; + _clone.id = id; + _clone.blocks = blocks; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (blocks != null) { + if (commState.it == null) { + if (!commState.putInt(blocks.size())) + return false; + + commState.it = blocks.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<GridGgfsBlockKey, byte[]> e = (Map.Entry<GridGgfsBlockKey, byte[]>)commState.cur; + + if (!commState.keyDone) { + if (!commState.putMessage(e.getKey())) + return false; + + commState.keyDone = true; + } + + if (!commState.putByteArray(e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 1: + if (!commState.putGridUuid(fileId)) + return false; + + commState.idx++; + + case 2: + if (!commState.putLong(id)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (blocks == null) + blocks = U.newHashMap(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { + Object _val = commState.getMessage(); + + if (_val == MSG_NOT_READ) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + blocks.put((GridGgfsBlockKey)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 1: + IgniteUuid fileId0 = commState.getGridUuid(); + + if (fileId0 == GRID_UUID_NOT_READ) + return false; + + fileId = fileId0; + + commState.idx++; + + case 2: + if (buf.remaining() < 8) + return false; + + id = commState.getLong(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 67; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsClientSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsClientSession.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsClientSession.java new file mode 100644 index 0000000..3ac4e5b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsClientSession.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * GGFS client session. Effectively used to manage lifecycle of opened resources and close them on + * connection close. + */ +public class GridGgfsClientSession { + /** Session resources. */ + private ConcurrentMap<Long, Closeable> rsrcMap = new ConcurrentHashMap8<>(); + + /** + * Registers resource within this session. + * + * @param rsrcId Resource id. + * @param rsrc Resource to register. + */ + public boolean registerResource(long rsrcId, Closeable rsrc) { + Object old = rsrcMap.putIfAbsent(rsrcId, rsrc); + + return old == null; + } + + /** + * Gets registered resource by ID. + * + * @param rsrcId Resource ID. + * @return Resource or {@code null} if resource was not found. + */ + @Nullable public <T> T resource(Long rsrcId) { + return (T)rsrcMap.get(rsrcId); + } + + /** + * Unregister previously registered resource. + * + * @param rsrcId Resource ID. + * @param rsrc Resource to unregister. + * @return {@code True} if resource was unregistered, {@code false} if no resource + * is associated with this ID or other resource is associated with this ID. + */ + public boolean unregisterResource(Long rsrcId, Closeable rsrc) { + return rsrcMap.remove(rsrcId, rsrc); + } + + /** + * @return Registered resources iterator. + */ + public Iterator<Closeable> registeredResources() { + return rsrcMap.values().iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java new file mode 100644 index 0000000..b4bedd8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.internal.util.direct.*; +import org.jetbrains.annotations.*; + +import java.nio.*; + +/** + * Base class for all GGFS communication messages sent between nodes. + */ +public abstract class GridGgfsCommunicationMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + } + + /** + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + public void prepareMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + // No-op. + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws IgniteCheckedException In case of error. + */ + public void finishUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsContext.java new file mode 100644 index 0000000..6057e7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsContext.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.internal.GridNodeAttributes.*; + +/** + * GGFS context holding all required components for GGFS instance. + */ +public class GridGgfsContext { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Configuration. */ + private final IgniteFsConfiguration cfg; + + /** Managers. */ + private List<GridGgfsManager> mgrs = new LinkedList<>(); + + /** Meta manager. */ + private final GridGgfsMetaManager metaMgr; + + /** Data manager. */ + private final GridGgfsDataManager dataMgr; + + /** Server manager. */ + private final GridGgfsServerManager srvMgr; + + /** Fragmentizer manager. */ + private final GridGgfsFragmentizerManager fragmentizerMgr; + + /** GGFS instance. */ + private final GridGgfsEx ggfs; + + /** + * @param ctx Kernal context. + * @param cfg GGFS configuration. + * @param metaMgr Meta manager. + * @param dataMgr Data manager. + * @param srvMgr Server manager. + * @param fragmentizerMgr Fragmentizer manager. + * @throws IgniteCheckedException If GGFs context instantiation is failed. + */ + public GridGgfsContext( + GridKernalContext ctx, + IgniteFsConfiguration cfg, + GridGgfsMetaManager metaMgr, + GridGgfsDataManager dataMgr, + GridGgfsServerManager srvMgr, + GridGgfsFragmentizerManager fragmentizerMgr + ) throws IgniteCheckedException { + this.ctx = ctx; + this.cfg = cfg; + + this.metaMgr = add(metaMgr); + this.dataMgr = add(dataMgr); + this.srvMgr = add(srvMgr); + this.fragmentizerMgr = add(fragmentizerMgr); + + ggfs = new GridGgfsImpl(this); + } + + /** + * @return GGFS instance. + */ + public GridGgfsEx ggfs() { + return ggfs; + } + + /** + * @return Kernal context. + */ + public GridKernalContext kernalContext() { + return ctx; + } + + /** + * @return GGFS configuration. + */ + public IgniteFsConfiguration configuration() { + return cfg; + } + + /** + * @return List of managers, in starting order. + */ + public List<GridGgfsManager> managers() { + return mgrs; + } + + /** + * @return Meta manager. + */ + public GridGgfsMetaManager meta() { + return metaMgr; + } + + /** + * @return Data manager. + */ + public GridGgfsDataManager data() { + return dataMgr; + } + + /** + * @return Server manager. + */ + public GridGgfsServerManager server() { + return srvMgr; + } + + /** + * @return Fragmentizer manager. + */ + public GridGgfsFragmentizerManager fragmentizer() { + return fragmentizerMgr; + } + + /** + * @param nodeId Node ID. + * @param topic Topic. + * @param msg Message. + * @param plc Policy. + * @throws IgniteCheckedException In case of error. + */ + public void send(UUID nodeId, Object topic, GridGgfsCommunicationMessage msg, GridIoPolicy plc) + throws IgniteCheckedException { + if (!kernalContext().localNodeId().equals(nodeId)) + msg.prepareMarshal(kernalContext().config().getMarshaller()); + + kernalContext().io().send(nodeId, topic, msg, plc); + } + + /** + * @param node Node. + * @param topic Topic. + * @param msg Message. + * @param plc Policy. + * @throws IgniteCheckedException In case of error. + */ + public void send(ClusterNode node, Object topic, GridGgfsCommunicationMessage msg, GridIoPolicy plc) + throws IgniteCheckedException { + if (!kernalContext().localNodeId().equals(node.id())) + msg.prepareMarshal(kernalContext().config().getMarshaller()); + + kernalContext().io().send(node, topic, msg, plc); + } + + /** + * Checks if given node is a GGFS node. + * + * @param node Node to check. + * @return {@code True} if node has GGFS with this name, {@code false} otherwise. + */ + public boolean ggfsNode(ClusterNode node) { + assert node != null; + + GridGgfsAttributes[] ggfs = node.attribute(ATTR_GGFS); + + if (ggfs != null) + for (GridGgfsAttributes attrs : ggfs) + if (F.eq(cfg.getName(), attrs.ggfsName())) + return true; + + return false; + } + + /** + * Adds manager to managers list. + * + * @param mgr Manager. + * @return Added manager. + */ + private <T extends GridGgfsManager> T add(@Nullable T mgr) { + if (mgr != null) + mgrs.add(mgr); + + return mgr; + } +}
