http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolver.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolver.java deleted file mode 100644 index 374bcfd..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolver.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * - */ -public class GridGgfsModeResolver { - /** Maximum size of map with cached path modes. */ - private static final int MAX_PATH_CACHE = 1000; - - /** Default mode. */ - private final IgniteFsMode dfltMode; - - /** Modes for particular paths. Ordered from longest to shortest. */ - private ArrayList<T2<IgniteFsPath, IgniteFsMode>> modes; - - /** Cached modes per path. */ - private Map<IgniteFsPath, IgniteFsMode> modesCache; - - /** Cached children modes per path. */ - private Map<IgniteFsPath, Set<IgniteFsMode>> childrenModesCache; - - /** - * @param dfltMode Default GGFS mode. - * @param modes List of configured modes. - */ - public GridGgfsModeResolver(IgniteFsMode dfltMode, @Nullable List<T2<IgniteFsPath, IgniteFsMode>> modes) { - assert dfltMode != null; - - this.dfltMode = dfltMode; - - if (modes != null) { - ArrayList<T2<IgniteFsPath, IgniteFsMode>> modes0 = new ArrayList<>(modes); - - // Sort paths, longest first. - Collections.sort(modes0, new Comparator<Map.Entry<IgniteFsPath, IgniteFsMode>>() { - @Override public int compare(Map.Entry<IgniteFsPath, IgniteFsMode> o1, - Map.Entry<IgniteFsPath, IgniteFsMode> o2) { - return o2.getKey().components().size() - o1.getKey().components().size(); - } - }); - - this.modes = modes0; - - modesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); - childrenModesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); - } - } - - /** - * Resolves GGFS mode for the given path. - * - * @param path GGFS path. - * @return GGFS mode. - */ - public IgniteFsMode resolveMode(IgniteFsPath path) { - assert path != null; - - if (modes == null) - return dfltMode; - else { - IgniteFsMode mode = modesCache.get(path); - - if (mode == null) { - for (T2<IgniteFsPath, IgniteFsMode> entry : modes) { - if (startsWith(path, entry.getKey())) { - // As modes ordered from most specific to least specific first mode found is ours. - mode = entry.getValue(); - - break; - } - } - - if (mode == null) - mode = dfltMode; - - modesCache.put(path, mode); - } - - return mode; - } - } - - /** - * @param path Path. - * @return Set of all modes that children paths could have. - */ - public Set<IgniteFsMode> resolveChildrenModes(IgniteFsPath path) { - assert path != null; - - if (modes == null) - return Collections.singleton(dfltMode); - else { - Set<IgniteFsMode> children = childrenModesCache.get(path); - - if (children == null) { - children = new HashSet<>(IgniteFsMode.values().length, 1.0f); - - IgniteFsMode pathDefault = dfltMode; - - for (T2<IgniteFsPath, IgniteFsMode> child : modes) { - if (startsWith(path, child.getKey())) { - pathDefault = child.getValue(); - - break; - } - else if (startsWith(child.getKey(), path)) - children.add(child.getValue()); - } - - children.add(pathDefault); - - childrenModesCache.put(path, children); - } - - return children; - } - } - - /** - * @return Unmodifiable copy of properly ordered modes prefixes - * or {@code null} if no modes set. - */ - @Nullable public List<T2<IgniteFsPath, IgniteFsMode>> modesOrdered() { - return modes != null ? Collections.unmodifiableList(modes) : null; - } - - /** - * Check if path starts with prefix. - * - * @param path Path. - * @param prefix Prefix. - * @return {@code true} if path starts with prefix, {@code false} if not. - */ - private static boolean startsWith(IgniteFsPath path, IgniteFsPath prefix) { - List<String> p1Comps = path.components(); - List<String> p2Comps = prefix.components(); - - if (p2Comps.size() > p1Comps.size()) - return false; - - for (int i = 0; i < p1Comps.size(); i++) { - if (i >= p2Comps.size() || p2Comps.get(i) == null) - // All prefix components already matched. - return true; - - if (!p1Comps.get(i).equals(p2Comps.get(i))) - return false; - } - - // Path and prefix components had same length and all of them matched. - return true; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java deleted file mode 100644 index 8c011ee..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Description of path modes. - */ -public class GridGgfsPaths implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Additional secondary file system properties. */ - private Map<String, String> props; - - /** Default GGFS mode. */ - private IgniteFsMode dfltMode; - - /** Path modes. */ - private List<T2<IgniteFsPath, IgniteFsMode>> pathModes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridGgfsPaths() { - // No-op. - } - - /** - * Constructor. - * - * @param props Additional secondary file system properties. - * @param dfltMode Default GGFS mode. - * @param pathModes Path modes. - */ - public GridGgfsPaths(Map<String, String> props, IgniteFsMode dfltMode, @Nullable List<T2<IgniteFsPath, - IgniteFsMode>> pathModes) { - this.props = props; - this.dfltMode = dfltMode; - this.pathModes = pathModes; - } - - /** - * @return Secondary file system properties. - */ - public Map<String, String> properties() { - return props; - } - - /** - * @return Default GGFS mode. - */ - public IgniteFsMode defaultMode() { - return dfltMode; - } - - /** - * @return Path modes. - */ - @Nullable public List<T2<IgniteFsPath, IgniteFsMode>> pathModes() { - return pathModes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeStringMap(out, props); - U.writeEnum0(out, dfltMode); - - if (pathModes != null) { - out.writeBoolean(true); - out.writeInt(pathModes.size()); - - for (T2<IgniteFsPath, IgniteFsMode> pathMode : pathModes) { - pathMode.getKey().writeExternal(out); - U.writeEnum0(out, pathMode.getValue()); - } - } - else - out.writeBoolean(false); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - props = U.readStringMap(in); - dfltMode = IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in)); - - if (in.readBoolean()) { - int size = in.readInt(); - - pathModes = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - IgniteFsPath path = new IgniteFsPath(); - path.readExternal(in); - - T2<IgniteFsPath, IgniteFsMode> entry = new T2<>(path, IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in))); - - pathModes.add(entry); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java deleted file mode 100644 index d1aa776..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.license.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.cache.GridCacheMemoryMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.fs.IgniteFsMode.*; -import static org.apache.ignite.internal.GridNodeAttributes.*; -import static org.apache.ignite.internal.processors.license.GridLicenseSubsystem.*; - -/** - * Fully operational GGFS processor. - */ -public class GridGgfsProcessor extends GridGgfsProcessorAdapter { - /** Null GGFS name. */ - private static final String NULL_NAME = UUID.randomUUID().toString(); - - /** Converts context to GGFS. */ - private static final IgniteClosure<GridGgfsContext,IgniteFs> CTX_TO_GGFS = new C1<GridGgfsContext, IgniteFs>() { - @Override public IgniteFs apply(GridGgfsContext ggfsCtx) { - return ggfsCtx.ggfs(); - } - }; - - /** */ - private final ConcurrentMap<String, GridGgfsContext> ggfsCache = - new ConcurrentHashMap8<>(); - - /** - * @param ctx Kernal context. - */ - public GridGgfsProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - IgniteFsConfiguration[] cfgs = ctx.config().getGgfsConfiguration(); - - assert cfgs != null && cfgs.length > 0; - - // Register GGFS messages. - GridTcpCommunicationMessageFactory.registerCommon(new GridTcpCommunicationMessageProducer() { - @Override - public GridTcpCommunicationMessageAdapter create(byte type) { - switch (type) { - case 65: - return new GridGgfsAckMessage(); - - case 66: - return new GridGgfsBlockKey(); - - case 67: - return new GridGgfsBlocksMessage(); - - case 68: - return new GridGgfsDeleteMessage(); - - case 69: - return new GridGgfsFileAffinityRange(); - - case 70: - return new GridGgfsFragmentizerRequest(); - - case 71: - return new GridGgfsFragmentizerResponse(); - - case 72: - return new GridGgfsSyncMessage(); - - default: - assert false : "Invalid GGFS message type."; - - return null; - } - } - }, 65, 66, 67, 68, 69,70, 71, 72); - - // Register HDFS edition usage with license manager. - GridLicenseUseRegistry.onUsage(HADOOP, getClass()); - - validateLocalGgfsConfigurations(cfgs); - - // Start GGFS instances. - for (IgniteFsConfiguration cfg : cfgs) { - GridGgfsContext ggfsCtx = new GridGgfsContext( - ctx, - new IgniteFsConfiguration(cfg), - new GridGgfsMetaManager(), - new GridGgfsDataManager(), - new GridGgfsServerManager(), - new GridGgfsFragmentizerManager()); - - // Start managers first. - for (GridGgfsManager mgr : ggfsCtx.managers()) - mgr.start(ggfsCtx); - - ggfsCache.put(maskName(cfg.getName()), ggfsCtx); - } - - if (log.isDebugEnabled()) - log.debug("GGFS processor started."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - if (!getBoolean(GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) - checkGgfsOnRemoteNode(n); - } - - for (GridGgfsContext ggfsCtx : ggfsCache.values()) - for (GridGgfsManager mgr : ggfsCtx.managers()) - mgr.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - // Stop GGFS instances. - for (GridGgfsContext ggfsCtx : ggfsCache.values()) { - if (log.isDebugEnabled()) - log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); - - List<GridGgfsManager> mgrs = ggfsCtx.managers(); - - for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - GridGgfsManager mgr = it.previous(); - - mgr.stop(cancel); - } - - ggfsCtx.ggfs().stop(); - } - - ggfsCache.clear(); - - if (log.isDebugEnabled()) - log.debug("GGFS processor stopped."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - for (GridGgfsContext ggfsCtx : ggfsCache.values()) { - if (log.isDebugEnabled()) - log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); - - List<GridGgfsManager> mgrs = ggfsCtx.managers(); - - for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - GridGgfsManager mgr = it.previous(); - - mgr.onKernalStop(cancel); - } - } - - if (log.isDebugEnabled()) - log.debug("Finished executing GGFS processor onKernalStop() callback."); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ggfsCacheSize: " + ggfsCache.size()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection<IgniteFs> ggfss() { - return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS); - } - - /** {@inheritDoc} */ - @Override @Nullable public IgniteFs ggfs(@Nullable String name) { - GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - - return ggfsCtx == null ? null : ggfsCtx.ggfs(); - } - - /** {@inheritDoc} */ - @Override @Nullable public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { - GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - - return ggfsCtx == null ? Collections.<GridIpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); - } - - /** {@inheritDoc} */ - @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, IgniteFsRecordResolver recRslv) { - return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { - super.addAttributes(attrs); - - IgniteConfiguration gridCfg = ctx.config(); - - // Node doesn't have GGFS if it: - // is daemon; - // doesn't have configured GGFS; - // doesn't have configured caches. - if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getGgfsConfiguration()) || - F.isEmpty(gridCfg.getCacheConfiguration())) - return; - - final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>(); - - F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() { - @Override public void apply(CacheConfiguration c) { - cacheCfgs.put(c.getName(), c); - } - }); - - Collection<GridGgfsAttributes> attrVals = new ArrayList<>(); - - assert gridCfg.getGgfsConfiguration() != null; - - for (IgniteFsConfiguration ggfsCfg : gridCfg.getGgfsConfiguration()) { - CacheConfiguration cacheCfg = cacheCfgs.get(ggfsCfg.getDataCacheName()); - - if (cacheCfg == null) - continue; // No cache for the given GGFS configuration. - - GridCacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); - - if (!(affMapper instanceof IgniteFsGroupDataBlocksKeyMapper)) - // Do not create GGFS attributes for such a node nor throw error about invalid configuration. - // Configuration will be validated later, while starting GridGgfsProcessor. - continue; - - attrVals.add(new GridGgfsAttributes( - ggfsCfg.getName(), - ggfsCfg.getBlockSize(), - ((IgniteFsGroupDataBlocksKeyMapper)affMapper).groupSize(), - ggfsCfg.getMetaCacheName(), - ggfsCfg.getDataCacheName(), - ggfsCfg.getDefaultMode(), - ggfsCfg.getPathModes(), - ggfsCfg.isFragmentizerEnabled())); - } - - attrs.put(ATTR_GGFS, attrVals.toArray(new GridGgfsAttributes[attrVals.size()])); - } - - /** - * @param name Cache name. - * @return Masked name accounting for {@code nulls}. - */ - private String maskName(@Nullable String name) { - return name == null ? NULL_NAME : name; - } - - /** - * Validates local GGFS configurations. Compares attributes only for GGFSes with same name. - * @param cfgs GGFS configurations - * @throws IgniteCheckedException If any of GGFS configurations is invalid. - */ - private void validateLocalGgfsConfigurations(IgniteFsConfiguration[] cfgs) throws IgniteCheckedException { - Collection<String> cfgNames = new HashSet<>(); - - for (IgniteFsConfiguration cfg : cfgs) { - String name = cfg.getName(); - - if (cfgNames.contains(name)) - throw new IgniteCheckedException("Duplicate GGFS name found (check configuration and " + - "assign unique name to each): " + name); - - GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName()); - - if (dataCache == null) - throw new IgniteCheckedException("Data cache is not configured locally for GGFS: " + cfg); - - if (dataCache.configuration().isQueryIndexEnabled()) - throw new IgniteCheckedException("GGFS data cache cannot start with enabled query indexing."); - - GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName()); - - if (metaCache == null) - throw new IgniteCheckedException("Metadata cache is not configured locally for GGFS: " + cfg); - - if (metaCache.configuration().isQueryIndexEnabled()) - throw new IgniteCheckedException("GGFS metadata cache cannot start with enabled query indexing."); - - if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName())) - throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName()); - - if (!(dataCache.configuration().getAffinityMapper() instanceof IgniteFsGroupDataBlocksKeyMapper)) - throw new IgniteCheckedException("Invalid GGFS data cache configuration (key affinity mapper class should be " + - IgniteFsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg); - - long maxSpaceSize = cfg.getMaxSpaceSize(); - - if (maxSpaceSize > 0) { - // Max space validation. - long maxHeapSize = Runtime.getRuntime().maxMemory(); - long offHeapSize = dataCache.configuration().getOffHeapMaxMemory(); - - if (offHeapSize < 0 && maxSpaceSize > maxHeapSize) - // Offheap is disabled. - throw new IgniteCheckedException("Maximum GGFS space size cannot be greater that size of available heap " + - "memory [maxHeapSize=" + maxHeapSize + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); - else if (offHeapSize > 0 && maxSpaceSize > maxHeapSize + offHeapSize) - // Offheap is enabled, but limited. - throw new IgniteCheckedException("Maximum GGFS space size cannot be greater than size of available heap " + - "memory and offheap storage [maxHeapSize=" + maxHeapSize + ", offHeapSize=" + offHeapSize + - ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); - } - - if (dataCache.configuration().getCacheMode() == PARTITIONED) { - int backups = dataCache.configuration().getBackups(); - - if (backups != 0) - throw new IgniteCheckedException("GGFS data cache cannot be used with backups (set backup count " + - "to 0 and restart the grid): " + cfg.getDataCacheName()); - } - - if (cfg.getMaxSpaceSize() == 0 && dataCache.configuration().getMemoryMode() == OFFHEAP_VALUES) - U.warn(log, "GGFS max space size is not specified but data cache values are stored off-heap (max " + - "space will be limited to 80% of max JVM heap size): " + cfg.getName()); - - boolean secondary = cfg.getDefaultMode() == PROXY; - - if (cfg.getPathModes() != null) { - for (Map.Entry<String, IgniteFsMode> mode : cfg.getPathModes().entrySet()) { - if (mode.getValue() == PROXY) - secondary = true; - } - } - - if (secondary) { - // When working in any mode except of primary, secondary FS config must be provided. - assertParameter(cfg.getSecondaryFileSystem() != null, - "secondaryFileSystem cannot be null when mode is SECONDARY"); - } - - cfgNames.add(name); - } - } - - /** - * Check GGFS config on remote node. - * - * @param rmtNode Remote node. - * @throws IgniteCheckedException If check failed. - */ - private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException { - GridGgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(GridNodeAttributes.ATTR_GGFS); - GridGgfsAttributes[] rmtAttrs = rmtNode.attribute(GridNodeAttributes.ATTR_GGFS); - - if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) - return; - - assert rmtAttrs != null && locAttrs != null; - - for (GridGgfsAttributes rmtAttr : rmtAttrs) - for (GridGgfsAttributes locAttr : locAttrs) { - // Checking the use of different caches on the different GGFSes. - if (!F.eq(rmtAttr.ggfsName(), locAttr.ggfsName())) { - if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName())) - throw new IgniteCheckedException("Meta cache names should be different for different GGFS instances " + - "configuration (fix configuration or set " + - "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property) [metaCacheName=" + rmtAttr.metaCacheName() + - ", locNodeId=" + ctx.localNodeId() + - ", rmtNodeId=" + rmtNode.id() + - ", locGgfsName=" + locAttr.ggfsName() + - ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); - - if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName())) - throw new IgniteCheckedException("Data cache names should be different for different GGFS instances " + - "configuration (fix configuration or set " + - "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property)[dataCacheName=" + rmtAttr.dataCacheName() + - ", locNodeId=" + ctx.localNodeId() + - ", rmtNodeId=" + rmtNode.id() + - ", locGgfsName=" + locAttr.ggfsName() + - ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); - - continue; - } - - // Compare other attributes only for GGFSes with same name. - checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(), - locAttr.blockSize(), rmtAttr.ggfsName()); - - checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(), - locAttr.groupSize(), rmtAttr.ggfsName()); - - checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(), - locAttr.metaCacheName(), rmtAttr.ggfsName()); - - checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(), - locAttr.dataCacheName(), rmtAttr.ggfsName()); - - checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(), - locAttr.defaultMode(), rmtAttr.ggfsName()); - - checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(), - locAttr.pathModes(), rmtAttr.ggfsName()); - - checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(), - locAttr.fragmentizerEnabled(), rmtAttr.ggfsName()); - } - } - - private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName) - throws IgniteCheckedException { - if (!F.eq(rmtVal, locVal)) - throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " + - "(fix configuration or set " + - "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property ) [rmtNodeId=" + rmtNodeId + - ", rmt" + propName + "=" + rmtVal + - ", loc" + propName + "=" + locVal + - ", ggfName=" + ggfsName + ']'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java deleted file mode 100644 index e56d0df..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.ipc.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * GGFS processor adapter. - */ -public abstract class GridGgfsProcessorAdapter extends GridProcessorAdapter { - /** - * Constructor. - * - * @param ctx Kernal context. - */ - protected GridGgfsProcessorAdapter(GridKernalContext ctx) { - super(ctx); - } - - /** - * Gets all GGFS instances. - * - * @return Collection of GGFS instances. - */ - public abstract Collection<IgniteFs> ggfss(); - - /** - * Gets GGFS instance. - * - * @param name (Nullable) GGFS name. - * @return GGFS instance. - */ - @Nullable public abstract IgniteFs ggfs(@Nullable String name); - - /** - * Gets server endpoints for particular GGFS. - * - * @param name GGFS name. - * @return Collection of endpoints or {@code null} in case GGFS is not defined. - */ - public abstract Collection<GridIpcServerEndpoint> endpoints(@Nullable String name); - - /** - * Create compute job for the given GGFS job. - * - * @param job GGFS job. - * @param ggfsName GGFS name. - * @param path Path. - * @param start Start position. - * @param length Length. - * @param recRslv Record resolver. - * @return Compute job. - */ - @Nullable public abstract ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, IgniteFsRecordResolver recRslv); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSamplingKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSamplingKey.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSamplingKey.java deleted file mode 100644 index b7d338c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSamplingKey.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Internal key used to track if sampling enabled or disabled for particular GGFS instance. - */ -class GridGgfsSamplingKey implements GridCacheInternal, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS name. */ - private String name; - - /** - * Default constructor. - * - * @param name - GGFS name. - */ - GridGgfsSamplingKey(String name) { - this.name = name; - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridGgfsSamplingKey() { - // No-op. - } - - /** - * @return GGFS name. - */ - public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return name == null ? 0 : name.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return this == obj || (obj instanceof GridGgfsSamplingKey && F.eq(name, ((GridGgfsSamplingKey)obj).name)); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, name); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException { - name = U.readString(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsSamplingKey.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryInputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryInputStreamDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryInputStreamDescriptor.java deleted file mode 100644 index 5fcda2f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryInputStreamDescriptor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.fs.*; - -/** - * Descriptor of an input stream opened to the secondary file system. - */ -public class GridGgfsSecondaryInputStreamDescriptor { - /** File info in the primary file system. */ - private final GridGgfsFileInfo info; - - /** Secondary file system input stream wrapper. */ - private final IgniteFsReader secReader; - - /** - * Constructor. - * - * @param info File info in the primary file system. - * @param secReader Secondary file system reader. - */ - GridGgfsSecondaryInputStreamDescriptor(GridGgfsFileInfo info, IgniteFsReader secReader) { - assert info != null; - assert secReader != null; - - this.info = info; - this.secReader = secReader; - } - - /** - * @return File info in the primary file system. - */ - GridGgfsFileInfo info() { - return info; - } - - /** - * @return Secondary file system reader. - */ - IgniteFsReader reader() { - return secReader; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryOutputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryOutputStreamDescriptor.java deleted file mode 100644 index 1acf53f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSecondaryOutputStreamDescriptor.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.lang.*; - -import java.io.*; - -/** - * Descriptor of an output stream opened to the secondary file system. - */ -public class GridGgfsSecondaryOutputStreamDescriptor { - /** Parent ID in the primary file system. */ - private final IgniteUuid parentId; - - /** File info in the primary file system. */ - private final GridGgfsFileInfo info; - - /** Output stream to the secondary file system. */ - private final OutputStream out; - - /** - * Constructor. - * - * @param parentId Parent ID in the primary file system. - * @param info File info in the primary file system. - * @param out Output stream to the secondary file system. - */ - GridGgfsSecondaryOutputStreamDescriptor(IgniteUuid parentId, GridGgfsFileInfo info, OutputStream out) { - assert parentId != null; - assert info != null; - assert out != null; - - this.parentId = parentId; - this.info = info; - this.out = out; - } - - /** - * @return Parent ID in the primary file system. - */ - IgniteUuid parentId() { - return parentId; - } - - /** - * @return File info in the primary file system. - */ - GridGgfsFileInfo info() { - return info; - } - - /** - * @return Output stream to the secondary file system. - */ - OutputStream out() { - return out; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServer.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServer.java deleted file mode 100644 index 1e1e149..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServer.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; -import org.apache.ignite.internal.fs.common.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.worker.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.spi.IgnitePortProtocol.*; - -/** - * GGFS server. Handles requests passed from GGFS clients. - */ -public class GridGgfsServer { - /** GGFS context. */ - private final GridGgfsContext ggfsCtx; - - /** Logger. */ - private final IgniteLogger log; - - /** GGFS marshaller. */ - private final GridGgfsMarshaller marsh; - - /** Endpoint configuration. */ - private final Map<String,String> endpointCfg; - - /** Server endpoint. */ - private GridIpcServerEndpoint srvEndpoint; - - /** Server message handler. */ - private GridGgfsServerHandler hnd; - - /** Accept worker. */ - private AcceptWorker acceptWorker; - - /** Started client workers. */ - private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>(); - - /** Flag indicating if this a management endpoint. */ - private final boolean mgmt; - - /** - * Constructs ggfs server manager. - * @param ggfsCtx GGFS context. - * @param endpointCfg Endpoint configuration to start. - * @param mgmt Management flag - if true, server is intended to be started for Visor. - */ - public GridGgfsServer(GridGgfsContext ggfsCtx, Map<String,String> endpointCfg, boolean mgmt) { - assert ggfsCtx != null; - assert endpointCfg != null; - - this.endpointCfg = endpointCfg; - this.ggfsCtx = ggfsCtx; - this.mgmt = mgmt; - - log = ggfsCtx.kernalContext().log(GridGgfsServer.class); - - marsh = new GridGgfsMarshaller(); - } - - /** - * Starts this server. - * - * @throws IgniteCheckedException If failed. - */ - public void start() throws IgniteCheckedException { - srvEndpoint = GridIpcServerEndpointDeserializer.deserialize(endpointCfg); - - if (U.isWindows() && srvEndpoint instanceof GridIpcSharedMemoryServerEndpoint) - throw new IgniteCheckedException(GridIpcSharedMemoryServerEndpoint.class.getSimpleName() + - " should not be configured on Windows (configure " + - GridIpcServerTcpEndpoint.class.getSimpleName() + ")"); - - if (srvEndpoint instanceof GridIpcServerTcpEndpoint) { - GridIpcServerTcpEndpoint srvEndpoint0 = (GridIpcServerTcpEndpoint)srvEndpoint; - - srvEndpoint0.setManagement(mgmt); - - if (srvEndpoint0.getHost() == null) { - if (mgmt) { - String locHostName = ggfsCtx.kernalContext().config().getLocalHost(); - - try { - srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress()); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e); - } - } - else - // Bind non-management endpoint to 127.0.0.1 by default. - srvEndpoint0.setHost("127.0.0.1"); - } - } - - ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint); - - srvEndpoint.start(); - - // GridIpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. - if (srvEndpoint.getPort() >= 0) - ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); - - hnd = new GridGgfsIpcHandler(ggfsCtx, mgmt); - - // Start client accept worker. - acceptWorker = new AcceptWorker(); - } - - /** - * Callback that is invoked when kernal is ready. - */ - public void onKernalStart() { - // Accept connections only when grid is ready. - if (srvEndpoint != null) - new IgniteThread(acceptWorker).start(); - } - - /** - * Stops this server. - * - * @param cancel Cancel flag. - */ - public void stop(boolean cancel) { - // Skip if did not start. - if (srvEndpoint == null) - return; - - // Stop accepting new client connections. - U.cancel(acceptWorker); - - U.join(acceptWorker, log); - - // Stop server handler, no more requests on existing connections will be processed. - try { - hnd.stop(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to stop GGFS server handler (will close client connections anyway).", e); - } - - // Stop existing client connections. - for (ClientWorker worker : clientWorkers) - U.cancel(worker); - - U.join(clientWorkers, log); - - // GridIpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. - if (srvEndpoint.getPort() >= 0) - ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); - - try { - ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cleanup server endpoint.", e); - } - } - - /** - * Gets IPC server endpoint. - * - * @return IPC server endpoint. - */ - public GridIpcServerEndpoint getIpcServerEndpoint() { - return srvEndpoint; - } - - /** - * Client reader thread. - */ - private class ClientWorker extends GridWorker { - /** Connected client endpoint. */ - private GridIpcEndpoint endpoint; - - /** Data output stream. */ - private final GridGgfsDataOutputStream out; - - /** Client session object. */ - private GridGgfsClientSession ses; - - /** Queue node for fast unlink. */ - private ConcurrentLinkedDeque8.Node<ClientWorker> node; - - /** - * Creates client worker. - * - * @param idx Worker index for worker thread naming. - * @param endpoint Connected client endpoint. - * @throws IgniteCheckedException If endpoint output stream cannot be obtained. - */ - protected ClientWorker(GridIpcEndpoint endpoint, int idx) throws IgniteCheckedException { - super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log); - - this.endpoint = endpoint; - - ses = new GridGgfsClientSession(); - - out = new GridGgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedException { - try { - GridGgfsDataInputStream dis = new GridGgfsDataInputStream(endpoint.inputStream()); - - byte[] hdr = new byte[GridGgfsMarshaller.HEADER_SIZE]; - - boolean first = true; - - while (!Thread.currentThread().isInterrupted()) { - dis.readFully(hdr); - - final long reqId = U.bytesToLong(hdr, 0); - - int ordinal = U.bytesToInt(hdr, 8); - - if (first) { // First message must be HANDSHAKE. - if (reqId != 0 || ordinal != GridGgfsIpcCommand.HANDSHAKE.ordinal()) { - U.warn(log, "Handshake failed."); - - return; - } - - first = false; - } - - final GridGgfsIpcCommand cmd = GridGgfsIpcCommand.valueOf(ordinal); - - GridGgfsMessage msg = marsh.unmarshall(cmd, hdr, dis); - - IgniteFuture<GridGgfsMessage> fut = hnd.handleAsync(ses, msg, dis); - - // If fut is null, no response is required. - if (fut != null) { - if (fut.isDone()) { - GridGgfsMessage res; - - try { - res = fut.get(); - } - catch (IgniteCheckedException e) { - res = new GridGgfsControlResponse(); - - ((GridGgfsControlResponse)res).error(e); - } - - try { - synchronized (out) { - // Reuse header. - GridGgfsMarshaller.fillHeader(hdr, reqId, res.command()); - - marsh.marshall(res, hdr, out); - - out.flush(); - } - } - catch (IOException | IgniteCheckedException e) { - shutdown0(e); - } - } - else { - fut.listenAsync(new CIX1<IgniteFuture<GridGgfsMessage>>() { - @Override public void applyx(IgniteFuture<GridGgfsMessage> fut) { - GridGgfsMessage res; - - try { - res = fut.get(); - } - catch (IgniteCheckedException e) { - res = new GridGgfsControlResponse(); - - ((GridGgfsControlResponse)res).error(e); - } - - try { - synchronized (out) { - byte[] hdr = GridGgfsMarshaller.createHeader(reqId, res.command()); - - marsh.marshall(res, hdr, out); - - out.flush(); - } - } - catch (IOException | IgniteCheckedException e) { - shutdown0(e); - } - } - }); - } - } - } - } - catch (EOFException ignored) { - // Client closed connection. - } - catch (IgniteCheckedException | IOException e) { - if (!isCancelled()) - U.error(log, "Failed to read data from client (will close connection)", e); - } - finally { - onFinished(); - } - } - - /** - * @param node Node in queue for this worker. - */ - public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) { - this.node = node; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - super.cancel(); - - shutdown0(null); - } - - /** - * @param e Optional exception occurred while stopping this - */ - private void shutdown0(@Nullable Throwable e) { - if (!isCancelled()) { - if (e != null) - U.error(log, "Stopping client reader due to exception: " + endpoint, e); - } - - U.closeQuiet(out); - - endpoint.close(); - } - - /** - * Final resource cleanup. - */ - private void onFinished() { - // Second close is no-op, if closed manually. - U.closeQuiet(out); - - endpoint.close(); - - // Finally, remove from queue. - if (clientWorkers.unlinkx(node)) - hnd.onClosed(ses); - } - } - - /** - * Accept worker. - */ - private class AcceptWorker extends GridWorker { - /** Accept index. */ - private int acceptCnt; - - /** - * Creates accept worker. - */ - protected AcceptWorker() { - super(ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedException { - try { - while (!Thread.currentThread().isInterrupted()) { - GridIpcEndpoint client = srvEndpoint.accept(); - - if (log.isDebugEnabled()) - log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() + - ", client=" + client + ']'); - - ClientWorker worker = new ClientWorker(client, acceptCnt++); - - IgniteThread workerThread = new IgniteThread(worker); - - ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker); - - worker.node(node); - - workerThread.start(); - } - } - catch (IgniteCheckedException e) { - if (!isCancelled()) - U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e); - } - finally { - srvEndpoint.close(); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - super.cancel(); - - srvEndpoint.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerHandler.java deleted file mode 100644 index fdfad70..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.fs.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * GGFS server message handler. Server component that is plugged in to the server implementation - * to handle incoming messages asynchronously. - */ -public interface GridGgfsServerHandler { - /** - * Asynchronously handles incoming message. - * - * @param ses Client session. - * @param msg Message to process. - * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message. - * @return Future that will be completed when response is ready or {@code null} if no - * response is required. - */ - @Nullable public IgniteFuture<GridGgfsMessage> handleAsync(GridGgfsClientSession ses, - GridGgfsMessage msg, DataInput in); - - /** - * Handles handles client close events. - * - * @param ses Session that was closed. - */ - public void onClosed(GridGgfsClientSession ses); - - /** - * Stops handling of incoming requests. No server commands will be handled anymore. - * - * @throws IgniteCheckedException If error occurred. - */ - public void stop() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerManager.java deleted file mode 100644 index daf9926..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsServerManager.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.fs.IgniteFsConfiguration.*; - -/** - * GGFS server manager. - */ -public class GridGgfsServerManager extends GridGgfsManager { - /** IPC server rebind interval. */ - private static final long REBIND_INTERVAL = 3000; - - /** Collection of servers to maintain. */ - private Collection<GridGgfsServer> srvrs; - - /** Server port binders. */ - private BindWorker bindWorker; - - /** Kernal start latch. */ - private CountDownLatch kernalStartLatch = new CountDownLatch(1); - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = ggfsCtx.configuration(); - Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration(); - - if (F.isEmpty(cfg)) { - // Set default configuration. - cfg = new HashMap<>(); - - cfg.put("type", U.isWindows() ? "tcp" : "shmem"); - cfg.put("port", String.valueOf(DFLT_IPC_PORT)); - } - - if (ggfsCfg.isIpcEndpointEnabled()) - bind(cfg, /*management*/false); - - if (ggfsCfg.getManagementPort() >= 0) { - cfg = new HashMap<>(); - - cfg.put("type", "tcp"); - cfg.put("port", String.valueOf(ggfsCfg.getManagementPort())); - - bind(cfg, /*management*/true); - } - - if (bindWorker != null) - new IgniteThread(bindWorker).start(); - } - - /** - * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread - * that will try to periodically start this endpoint. - * - * @param endpointCfg Endpoint configuration to start. - * @param mgmt {@code True} if endpoint is management. - * @throws IgniteCheckedException If failed. - */ - private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException { - if (srvrs == null) - srvrs = new ConcurrentLinkedQueue<>(); - - GridGgfsServer ipcSrv = new GridGgfsServer(ggfsCtx, endpointCfg, mgmt); - - try { - ipcSrv.start(); - - srvrs.add(ipcSrv); - } - catch (GridIpcEndpointBindException ignored) { - int port = ipcSrv.getIpcServerEndpoint().getPort(); - - String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : ""; - - U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " + - "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." + - portMsg); - - if (bindWorker == null) - bindWorker = new BindWorker(); - - bindWorker.addConfiguration(endpointCfg, mgmt); - } - } - - /** - * @return Collection of active endpoints. - */ - public Collection<GridIpcServerEndpoint> endpoints() { - return F.viewReadOnly(srvrs, new C1<GridGgfsServer, GridIpcServerEndpoint>() { - @Override public GridIpcServerEndpoint apply(GridGgfsServer e) { - return e.getIpcServerEndpoint(); - } - }); - } - - /** {@inheritDoc} */ - @Override protected void onKernalStart0() throws IgniteCheckedException { - if (!F.isEmpty(srvrs)) { - for (GridGgfsServer srv : srvrs) - srv.onKernalStart(); - } - - kernalStartLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { - // Safety. - kernalStartLatch.countDown(); - - if (bindWorker != null) { - bindWorker.cancel(); - - U.join(bindWorker, log); - } - - if (!F.isEmpty(srvrs)) { - for (GridGgfsServer srv : srvrs) - srv.stop(cancel); - } - } - - /** - * Bind worker. - */ - @SuppressWarnings("BusyWait") - private class BindWorker extends GridWorker { - /** Configurations to bind. */ - private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>(); - - /** - * Constructor. - */ - private BindWorker() { - super(ggfsCtx.kernalContext().gridName(), "bind-worker", ggfsCtx.kernalContext().log()); - } - - /** - * Adds configuration to bind on. Should not be called after thread start. - * - * @param cfg Configuration. - * @param mgmt Management flag. - */ - public void addConfiguration(Map<String, String> cfg, boolean mgmt) { - bindCfgs.add(F.t(cfg, mgmt)); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - kernalStartLatch.await(); - - while (!isCancelled()) { - Thread.sleep(REBIND_INTERVAL); - - Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator(); - - while (it.hasNext()) { - IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next(); - - GridGgfsServer ipcSrv = new GridGgfsServer(ggfsCtx, cfg.get1(), cfg.get2()); - - try { - ipcSrv.start(); - - ipcSrv.onKernalStart(); - - srvrs.add(ipcSrv); - - it.remove(); - } - catch (IgniteCheckedException e) { - if (GridWorker.log.isDebugEnabled()) - GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']'); - } - } - - if (bindCfgs.isEmpty()) - break; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsStatus.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsStatus.java deleted file mode 100644 index 6fdd487..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsStatus.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import java.io.*; - -/** - * GGFS response for status request. - */ -public class GridGgfsStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Total space size. */ - private long spaceTotal; - - /** Used space in GGFS. */ - private long spaceUsed; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridGgfsStatus() { - // No-op. - } - - /** - * @param spaceUsed Used space in GGFS. - * @param spaceTotal Total space available in GGFS. - */ - public GridGgfsStatus(long spaceUsed, long spaceTotal) { - this.spaceUsed = spaceUsed; - this.spaceTotal = spaceTotal; - } - - /** - * @return Total space available in GGFS. - */ - public long spaceTotal() { - return spaceTotal; - } - - /** - * @return Used space in GGFS. - */ - public long spaceUsed() { - return spaceUsed; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(spaceUsed); - out.writeLong(spaceTotal); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - spaceUsed = in.readLong(); - spaceTotal = in.readLong(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java deleted file mode 100644 index 150b82d..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; - -/** - * Basic sync message. - */ -public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Coordinator node order. */ - private long order; - - /** Response flag. */ - private boolean res; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridGgfsSyncMessage() { - // No-op. - } - - /** - * @param order Node order. - * @param res Response flag. - */ - public GridGgfsSyncMessage(long order, boolean res) { - this.order = order; - this.res = res; - } - - /** - * @return Coordinator node order. - */ - public long order() { - return order; - } - - /** - * @return {@code True} if response message. - */ - public boolean response() { - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsSyncMessage.class, this); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridGgfsSyncMessage _clone = new GridGgfsSyncMessage(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridGgfsSyncMessage _clone = (GridGgfsSyncMessage)_msg; - - _clone.order = order; - _clone.res = res; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putLong(order)) - return false; - - commState.idx++; - - case 1: - if (!commState.putBoolean(res)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 0: - if (buf.remaining() < 8) - return false; - - order = commState.getLong(); - - commState.idx++; - - case 1: - if (buf.remaining() < 1) - return false; - - res = commState.getBoolean(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 72; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsThread.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsThread.java deleted file mode 100644 index 7ef27ab..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsThread.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.internal.util.typedef.*; - -/** - * GGFS ad-hoc thread. - */ -public abstract class GridGgfsThread extends Thread { - /** - * Creates {@code GGFS} add-hoc thread. - */ - protected GridGgfsThread() { - super("ggfs-worker"); - } - - /** - * Creates {@code GGFS} add-hoc thread. - * - * @param name Thread name. - */ - protected GridGgfsThread(String name) { - super(name); - } - - /** {@inheritDoc} */ - @Override public final void run() { - try { - body(); - } - catch (InterruptedException ignore) { - interrupt(); - } - // Catch all. - catch (Throwable e) { - X.error("Failed to execute GGFS ad-hoc thread: " + e.getMessage()); - - e.printStackTrace(); - } - finally { - try { - cleanup(); - } - // Catch all. - catch (Throwable e) { - X.error("Failed to clean up GGFS ad-hoc thread: " + e.getMessage()); - - e.printStackTrace(); - } - } - } - - /** - * Thread body. - * - * @throws InterruptedException If interrupted. - */ - protected abstract void body() throws InterruptedException; - - /** - * Cleanup. - */ - protected void cleanup() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsHelper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsHelper.java deleted file mode 100644 index e91ffe3..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsHelper.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; - -/** - * No-op utils processor adapter. - */ -public class GridNoopGgfsHelper implements GridGgfsHelper { - /** {@inheritDoc} */ - @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean isGgfsBlockKey(Object key) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java deleted file mode 100644 index d01c7b0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.fs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Nop GGFS processor implementation. - */ -public class GridNoopGgfsProcessor extends GridGgfsProcessorAdapter { - /** - * Constructor. - * - * @param ctx Kernal context. - */ - public GridNoopGgfsProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ggfsCacheSize: " + 0); - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFs> ggfss() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteFs ggfs(@Nullable String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, IgniteFsRecordResolver recRslv) { - return null; - } -}
