http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsModeResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsModeResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsModeResolver.java new file mode 100644 index 0000000..4e3a300 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsModeResolver.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class GridGgfsModeResolver { + /** Maximum size of map with cached path modes. */ + private static final int MAX_PATH_CACHE = 1000; + + /** Default mode. */ + private final IgniteFsMode dfltMode; + + /** Modes for particular paths. Ordered from longest to shortest. */ + private ArrayList<T2<IgniteFsPath, IgniteFsMode>> modes; + + /** Cached modes per path. */ + private Map<IgniteFsPath, IgniteFsMode> modesCache; + + /** Cached children modes per path. */ + private Map<IgniteFsPath, Set<IgniteFsMode>> childrenModesCache; + + /** + * @param dfltMode Default GGFS mode. + * @param modes List of configured modes. + */ + public GridGgfsModeResolver(IgniteFsMode dfltMode, @Nullable List<T2<IgniteFsPath, IgniteFsMode>> modes) { + assert dfltMode != null; + + this.dfltMode = dfltMode; + + if (modes != null) { + ArrayList<T2<IgniteFsPath, IgniteFsMode>> modes0 = new ArrayList<>(modes); + + // Sort paths, longest first. + Collections.sort(modes0, new Comparator<Map.Entry<IgniteFsPath, IgniteFsMode>>() { + @Override public int compare(Map.Entry<IgniteFsPath, IgniteFsMode> o1, + Map.Entry<IgniteFsPath, IgniteFsMode> o2) { + return o2.getKey().components().size() - o1.getKey().components().size(); + } + }); + + this.modes = modes0; + + modesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); + childrenModesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); + } + } + + /** + * Resolves GGFS mode for the given path. + * + * @param path GGFS path. + * @return GGFS mode. + */ + public IgniteFsMode resolveMode(IgniteFsPath path) { + assert path != null; + + if (modes == null) + return dfltMode; + else { + IgniteFsMode mode = modesCache.get(path); + + if (mode == null) { + for (T2<IgniteFsPath, IgniteFsMode> entry : modes) { + if (startsWith(path, entry.getKey())) { + // As modes ordered from most specific to least specific first mode found is ours. + mode = entry.getValue(); + + break; + } + } + + if (mode == null) + mode = dfltMode; + + modesCache.put(path, mode); + } + + return mode; + } + } + + /** + * @param path Path. + * @return Set of all modes that children paths could have. + */ + public Set<IgniteFsMode> resolveChildrenModes(IgniteFsPath path) { + assert path != null; + + if (modes == null) + return Collections.singleton(dfltMode); + else { + Set<IgniteFsMode> children = childrenModesCache.get(path); + + if (children == null) { + children = new HashSet<>(IgniteFsMode.values().length, 1.0f); + + IgniteFsMode pathDefault = dfltMode; + + for (T2<IgniteFsPath, IgniteFsMode> child : modes) { + if (startsWith(path, child.getKey())) { + pathDefault = child.getValue(); + + break; + } + else if (startsWith(child.getKey(), path)) + children.add(child.getValue()); + } + + children.add(pathDefault); + + childrenModesCache.put(path, children); + } + + return children; + } + } + + /** + * @return Unmodifiable copy of properly ordered modes prefixes + * or {@code null} if no modes set. + */ + @Nullable public List<T2<IgniteFsPath, IgniteFsMode>> modesOrdered() { + return modes != null ? Collections.unmodifiableList(modes) : null; + } + + /** + * Check if path starts with prefix. + * + * @param path Path. + * @param prefix Prefix. + * @return {@code true} if path starts with prefix, {@code false} if not. + */ + private static boolean startsWith(IgniteFsPath path, IgniteFsPath prefix) { + List<String> p1Comps = path.components(); + List<String> p2Comps = prefix.components(); + + if (p2Comps.size() > p1Comps.size()) + return false; + + for (int i = 0; i < p1Comps.size(); i++) { + if (i >= p2Comps.size() || p2Comps.get(i) == null) + // All prefix components already matched. + return true; + + if (!p1Comps.get(i).equals(p2Comps.get(i))) + return false; + } + + // Path and prefix components had same length and all of them matched. + return true; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsPaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsPaths.java new file mode 100644 index 0000000..99181cd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsPaths.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Description of path modes. + */ +public class GridGgfsPaths implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Additional secondary file system properties. */ + private Map<String, String> props; + + /** Default GGFS mode. */ + private IgniteFsMode dfltMode; + + /** Path modes. */ + private List<T2<IgniteFsPath, IgniteFsMode>> pathModes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsPaths() { + // No-op. + } + + /** + * Constructor. + * + * @param props Additional secondary file system properties. + * @param dfltMode Default GGFS mode. + * @param pathModes Path modes. + */ + public GridGgfsPaths(Map<String, String> props, IgniteFsMode dfltMode, @Nullable List<T2<IgniteFsPath, + IgniteFsMode>> pathModes) { + this.props = props; + this.dfltMode = dfltMode; + this.pathModes = pathModes; + } + + /** + * @return Secondary file system properties. + */ + public Map<String, String> properties() { + return props; + } + + /** + * @return Default GGFS mode. + */ + public IgniteFsMode defaultMode() { + return dfltMode; + } + + /** + * @return Path modes. + */ + @Nullable public List<T2<IgniteFsPath, IgniteFsMode>> pathModes() { + return pathModes; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeStringMap(out, props); + U.writeEnum0(out, dfltMode); + + if (pathModes != null) { + out.writeBoolean(true); + out.writeInt(pathModes.size()); + + for (T2<IgniteFsPath, IgniteFsMode> pathMode : pathModes) { + pathMode.getKey().writeExternal(out); + U.writeEnum0(out, pathMode.getValue()); + } + } + else + out.writeBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + props = U.readStringMap(in); + dfltMode = IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in)); + + if (in.readBoolean()) { + int size = in.readInt(); + + pathModes = new ArrayList<>(size); + + for (int i = 0; i < size; i++) { + IgniteFsPath path = new IgniteFsPath(); + path.readExternal(in); + + T2<IgniteFsPath, IgniteFsMode> entry = new T2<>(path, IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in))); + + pathModes.add(entry); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java new file mode 100644 index 0000000..07ea5f1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessor.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.license.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.fs.IgniteFsMode.*; +import static org.apache.ignite.internal.GridNodeAttributes.*; +import static org.apache.ignite.internal.processors.license.GridLicenseSubsystem.*; + +/** + * Fully operational GGFS processor. + */ +public class GridGgfsProcessor extends GridGgfsProcessorAdapter { + /** Null GGFS name. */ + private static final String NULL_NAME = UUID.randomUUID().toString(); + + /** Converts context to GGFS. */ + private static final IgniteClosure<GridGgfsContext,IgniteFs> CTX_TO_GGFS = new C1<GridGgfsContext, IgniteFs>() { + @Override public IgniteFs apply(GridGgfsContext ggfsCtx) { + return ggfsCtx.ggfs(); + } + }; + + /** */ + private final ConcurrentMap<String, GridGgfsContext> ggfsCache = + new ConcurrentHashMap8<>(); + + /** + * @param ctx Kernal context. + */ + public GridGgfsProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + IgniteFsConfiguration[] cfgs = ctx.config().getGgfsConfiguration(); + + assert cfgs != null && cfgs.length > 0; + + // Register GGFS messages. + GridTcpCommunicationMessageFactory.registerCommon(new GridTcpCommunicationMessageProducer() { + @Override + public GridTcpCommunicationMessageAdapter create(byte type) { + switch (type) { + case 65: + return new GridGgfsAckMessage(); + + case 66: + return new GridGgfsBlockKey(); + + case 67: + return new GridGgfsBlocksMessage(); + + case 68: + return new GridGgfsDeleteMessage(); + + case 69: + return new GridGgfsFileAffinityRange(); + + case 70: + return new GridGgfsFragmentizerRequest(); + + case 71: + return new GridGgfsFragmentizerResponse(); + + case 72: + return new GridGgfsSyncMessage(); + + default: + assert false : "Invalid GGFS message type."; + + return null; + } + } + }, 65, 66, 67, 68, 69,70, 71, 72); + + // Register HDFS edition usage with license manager. + GridLicenseUseRegistry.onUsage(HADOOP, getClass()); + + validateLocalGgfsConfigurations(cfgs); + + // Start GGFS instances. + for (IgniteFsConfiguration cfg : cfgs) { + GridGgfsContext ggfsCtx = new GridGgfsContext( + ctx, + new IgniteFsConfiguration(cfg), + new GridGgfsMetaManager(), + new GridGgfsDataManager(), + new GridGgfsServerManager(), + new GridGgfsFragmentizerManager()); + + // Start managers first. + for (GridGgfsManager mgr : ggfsCtx.managers()) + mgr.start(ggfsCtx); + + ggfsCache.put(maskName(cfg.getName()), ggfsCtx); + } + + if (log.isDebugEnabled()) + log.debug("GGFS processor started."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + if (!getBoolean(GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) + checkGgfsOnRemoteNode(n); + } + + for (GridGgfsContext ggfsCtx : ggfsCache.values()) + for (GridGgfsManager mgr : ggfsCtx.managers()) + mgr.onKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // Stop GGFS instances. + for (GridGgfsContext ggfsCtx : ggfsCache.values()) { + if (log.isDebugEnabled()) + log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); + + List<GridGgfsManager> mgrs = ggfsCtx.managers(); + + for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridGgfsManager mgr = it.previous(); + + mgr.stop(cancel); + } + + ggfsCtx.ggfs().stop(); + } + + ggfsCache.clear(); + + if (log.isDebugEnabled()) + log.debug("GGFS processor stopped."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + for (GridGgfsContext ggfsCtx : ggfsCache.values()) { + if (log.isDebugEnabled()) + log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName()); + + List<GridGgfsManager> mgrs = ggfsCtx.managers(); + + for (ListIterator<GridGgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridGgfsManager mgr = it.previous(); + + mgr.onKernalStop(cancel); + } + } + + if (log.isDebugEnabled()) + log.debug("Finished executing GGFS processor onKernalStop() callback."); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ggfsCacheSize: " + ggfsCache.size()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Collection<IgniteFs> ggfss() { + return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS); + } + + /** {@inheritDoc} */ + @Override @Nullable public IgniteFs ggfs(@Nullable String name) { + GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); + + return ggfsCtx == null ? null : ggfsCtx.ggfs(); + } + + /** {@inheritDoc} */ + @Override @Nullable public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { + GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); + + return ggfsCtx == null ? Collections.<GridIpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); + } + + /** {@inheritDoc} */ + @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, + long start, long length, IgniteFsRecordResolver recRslv) { + return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException { + super.addAttributes(attrs); + + IgniteConfiguration gridCfg = ctx.config(); + + // Node doesn't have GGFS if it: + // is daemon; + // doesn't have configured GGFS; + // doesn't have configured caches. + if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getGgfsConfiguration()) || + F.isEmpty(gridCfg.getCacheConfiguration())) + return; + + final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>(); + + F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() { + @Override public void apply(CacheConfiguration c) { + cacheCfgs.put(c.getName(), c); + } + }); + + Collection<GridGgfsAttributes> attrVals = new ArrayList<>(); + + assert gridCfg.getGgfsConfiguration() != null; + + for (IgniteFsConfiguration ggfsCfg : gridCfg.getGgfsConfiguration()) { + CacheConfiguration cacheCfg = cacheCfgs.get(ggfsCfg.getDataCacheName()); + + if (cacheCfg == null) + continue; // No cache for the given GGFS configuration. + + GridCacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); + + if (!(affMapper instanceof IgniteFsGroupDataBlocksKeyMapper)) + // Do not create GGFS attributes for such a node nor throw error about invalid configuration. + // Configuration will be validated later, while starting GridGgfsProcessor. + continue; + + attrVals.add(new GridGgfsAttributes( + ggfsCfg.getName(), + ggfsCfg.getBlockSize(), + ((IgniteFsGroupDataBlocksKeyMapper)affMapper).groupSize(), + ggfsCfg.getMetaCacheName(), + ggfsCfg.getDataCacheName(), + ggfsCfg.getDefaultMode(), + ggfsCfg.getPathModes(), + ggfsCfg.isFragmentizerEnabled())); + } + + attrs.put(ATTR_GGFS, attrVals.toArray(new GridGgfsAttributes[attrVals.size()])); + } + + /** + * @param name Cache name. + * @return Masked name accounting for {@code nulls}. + */ + private String maskName(@Nullable String name) { + return name == null ? NULL_NAME : name; + } + + /** + * Validates local GGFS configurations. Compares attributes only for GGFSes with same name. + * @param cfgs GGFS configurations + * @throws IgniteCheckedException If any of GGFS configurations is invalid. + */ + private void validateLocalGgfsConfigurations(IgniteFsConfiguration[] cfgs) throws IgniteCheckedException { + Collection<String> cfgNames = new HashSet<>(); + + for (IgniteFsConfiguration cfg : cfgs) { + String name = cfg.getName(); + + if (cfgNames.contains(name)) + throw new IgniteCheckedException("Duplicate GGFS name found (check configuration and " + + "assign unique name to each): " + name); + + GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName()); + + if (dataCache == null) + throw new IgniteCheckedException("Data cache is not configured locally for GGFS: " + cfg); + + if (dataCache.configuration().isQueryIndexEnabled()) + throw new IgniteCheckedException("GGFS data cache cannot start with enabled query indexing."); + + GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName()); + + if (metaCache == null) + throw new IgniteCheckedException("Metadata cache is not configured locally for GGFS: " + cfg); + + if (metaCache.configuration().isQueryIndexEnabled()) + throw new IgniteCheckedException("GGFS metadata cache cannot start with enabled query indexing."); + + if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName())) + throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName()); + + if (!(dataCache.configuration().getAffinityMapper() instanceof IgniteFsGroupDataBlocksKeyMapper)) + throw new IgniteCheckedException("Invalid GGFS data cache configuration (key affinity mapper class should be " + + IgniteFsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg); + + long maxSpaceSize = cfg.getMaxSpaceSize(); + + if (maxSpaceSize > 0) { + // Max space validation. + long maxHeapSize = Runtime.getRuntime().maxMemory(); + long offHeapSize = dataCache.configuration().getOffHeapMaxMemory(); + + if (offHeapSize < 0 && maxSpaceSize > maxHeapSize) + // Offheap is disabled. + throw new IgniteCheckedException("Maximum GGFS space size cannot be greater that size of available heap " + + "memory [maxHeapSize=" + maxHeapSize + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); + else if (offHeapSize > 0 && maxSpaceSize > maxHeapSize + offHeapSize) + // Offheap is enabled, but limited. + throw new IgniteCheckedException("Maximum GGFS space size cannot be greater than size of available heap " + + "memory and offheap storage [maxHeapSize=" + maxHeapSize + ", offHeapSize=" + offHeapSize + + ", maxGgfsSpaceSize=" + maxSpaceSize + ']'); + } + + if (dataCache.configuration().getCacheMode() == PARTITIONED) { + int backups = dataCache.configuration().getBackups(); + + if (backups != 0) + throw new IgniteCheckedException("GGFS data cache cannot be used with backups (set backup count " + + "to 0 and restart the grid): " + cfg.getDataCacheName()); + } + + if (cfg.getMaxSpaceSize() == 0 && dataCache.configuration().getMemoryMode() == OFFHEAP_VALUES) + U.warn(log, "GGFS max space size is not specified but data cache values are stored off-heap (max " + + "space will be limited to 80% of max JVM heap size): " + cfg.getName()); + + boolean secondary = cfg.getDefaultMode() == PROXY; + + if (cfg.getPathModes() != null) { + for (Map.Entry<String, IgniteFsMode> mode : cfg.getPathModes().entrySet()) { + if (mode.getValue() == PROXY) + secondary = true; + } + } + + if (secondary) { + // When working in any mode except of primary, secondary FS config must be provided. + assertParameter(cfg.getSecondaryFileSystem() != null, + "secondaryFileSystem cannot be null when mode is SECONDARY"); + } + + cfgNames.add(name); + } + } + + /** + * Check GGFS config on remote node. + * + * @param rmtNode Remote node. + * @throws IgniteCheckedException If check failed. + */ + private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException { + GridGgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(GridNodeAttributes.ATTR_GGFS); + GridGgfsAttributes[] rmtAttrs = rmtNode.attribute(GridNodeAttributes.ATTR_GGFS); + + if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) + return; + + assert rmtAttrs != null && locAttrs != null; + + for (GridGgfsAttributes rmtAttr : rmtAttrs) + for (GridGgfsAttributes locAttr : locAttrs) { + // Checking the use of different caches on the different GGFSes. + if (!F.eq(rmtAttr.ggfsName(), locAttr.ggfsName())) { + if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName())) + throw new IgniteCheckedException("Meta cache names should be different for different GGFS instances " + + "configuration (fix configuration or set " + + "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + + "property) [metaCacheName=" + rmtAttr.metaCacheName() + + ", locNodeId=" + ctx.localNodeId() + + ", rmtNodeId=" + rmtNode.id() + + ", locGgfsName=" + locAttr.ggfsName() + + ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); + + if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName())) + throw new IgniteCheckedException("Data cache names should be different for different GGFS instances " + + "configuration (fix configuration or set " + + "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + + "property)[dataCacheName=" + rmtAttr.dataCacheName() + + ", locNodeId=" + ctx.localNodeId() + + ", rmtNodeId=" + rmtNode.id() + + ", locGgfsName=" + locAttr.ggfsName() + + ", rmtGgfsName=" + rmtAttr.ggfsName() + ']'); + + continue; + } + + // Compare other attributes only for GGFSes with same name. + checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(), + locAttr.blockSize(), rmtAttr.ggfsName()); + + checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(), + locAttr.groupSize(), rmtAttr.ggfsName()); + + checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(), + locAttr.metaCacheName(), rmtAttr.ggfsName()); + + checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(), + locAttr.dataCacheName(), rmtAttr.ggfsName()); + + checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(), + locAttr.defaultMode(), rmtAttr.ggfsName()); + + checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(), + locAttr.pathModes(), rmtAttr.ggfsName()); + + checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(), + locAttr.fragmentizerEnabled(), rmtAttr.ggfsName()); + } + } + + private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName) + throws IgniteCheckedException { + if (!F.eq(rmtVal, locVal)) + throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " + + "(fix configuration or set " + + "-D" + GG_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + + "property ) [rmtNodeId=" + rmtNodeId + + ", rmt" + propName + "=" + rmtVal + + ", loc" + propName + "=" + locVal + + ", ggfName=" + ggfsName + ']'); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorAdapter.java new file mode 100644 index 0000000..d769f78 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsProcessorAdapter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.ipc.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * GGFS processor adapter. + */ +public abstract class GridGgfsProcessorAdapter extends GridProcessorAdapter { + /** + * Constructor. + * + * @param ctx Kernal context. + */ + protected GridGgfsProcessorAdapter(GridKernalContext ctx) { + super(ctx); + } + + /** + * Gets all GGFS instances. + * + * @return Collection of GGFS instances. + */ + public abstract Collection<IgniteFs> ggfss(); + + /** + * Gets GGFS instance. + * + * @param name (Nullable) GGFS name. + * @return GGFS instance. + */ + @Nullable public abstract IgniteFs ggfs(@Nullable String name); + + /** + * Gets server endpoints for particular GGFS. + * + * @param name GGFS name. + * @return Collection of endpoints or {@code null} in case GGFS is not defined. + */ + public abstract Collection<GridIpcServerEndpoint> endpoints(@Nullable String name); + + /** + * Create compute job for the given GGFS job. + * + * @param job GGFS job. + * @param ggfsName GGFS name. + * @param path Path. + * @param start Start position. + * @param length Length. + * @param recRslv Record resolver. + * @return Compute job. + */ + @Nullable public abstract ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, + long start, long length, IgniteFsRecordResolver recRslv); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSamplingKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSamplingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSamplingKey.java new file mode 100644 index 0000000..039af48 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSamplingKey.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Internal key used to track if sampling enabled or disabled for particular GGFS instance. + */ +class GridGgfsSamplingKey implements GridCacheInternal, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String name; + + /** + * Default constructor. + * + * @param name - GGFS name. + */ + GridGgfsSamplingKey(String name) { + this.name = name; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridGgfsSamplingKey() { + // No-op. + } + + /** + * @return GGFS name. + */ + public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return name == null ? 0 : name.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return this == obj || (obj instanceof GridGgfsSamplingKey && F.eq(name, ((GridGgfsSamplingKey)obj).name)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + name = U.readString(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsSamplingKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryInputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryInputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryInputStreamDescriptor.java new file mode 100644 index 0000000..37b9e5c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryInputStreamDescriptor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.fs.*; + +/** + * Descriptor of an input stream opened to the secondary file system. + */ +public class GridGgfsSecondaryInputStreamDescriptor { + /** File info in the primary file system. */ + private final GridGgfsFileInfo info; + + /** Secondary file system input stream wrapper. */ + private final IgniteFsReader secReader; + + /** + * Constructor. + * + * @param info File info in the primary file system. + * @param secReader Secondary file system reader. + */ + GridGgfsSecondaryInputStreamDescriptor(GridGgfsFileInfo info, IgniteFsReader secReader) { + assert info != null; + assert secReader != null; + + this.info = info; + this.secReader = secReader; + } + + /** + * @return File info in the primary file system. + */ + GridGgfsFileInfo info() { + return info; + } + + /** + * @return Secondary file system reader. + */ + IgniteFsReader reader() { + return secReader; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryOutputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryOutputStreamDescriptor.java new file mode 100644 index 0000000..aa2548d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSecondaryOutputStreamDescriptor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.lang.*; + +import java.io.*; + +/** + * Descriptor of an output stream opened to the secondary file system. + */ +public class GridGgfsSecondaryOutputStreamDescriptor { + /** Parent ID in the primary file system. */ + private final IgniteUuid parentId; + + /** File info in the primary file system. */ + private final GridGgfsFileInfo info; + + /** Output stream to the secondary file system. */ + private final OutputStream out; + + /** + * Constructor. + * + * @param parentId Parent ID in the primary file system. + * @param info File info in the primary file system. + * @param out Output stream to the secondary file system. + */ + GridGgfsSecondaryOutputStreamDescriptor(IgniteUuid parentId, GridGgfsFileInfo info, OutputStream out) { + assert parentId != null; + assert info != null; + assert out != null; + + this.parentId = parentId; + this.info = info; + this.out = out; + } + + /** + * @return Parent ID in the primary file system. + */ + IgniteUuid parentId() { + return parentId; + } + + /** + * @return File info in the primary file system. + */ + GridGgfsFileInfo info() { + return info; + } + + /** + * @return Output stream to the secondary file system. + */ + OutputStream out() { + return out; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java new file mode 100644 index 0000000..73010c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.fs.common.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.worker.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.spi.IgnitePortProtocol.*; + +/** + * GGFS server. Handles requests passed from GGFS clients. + */ +public class GridGgfsServer { + /** GGFS context. */ + private final GridGgfsContext ggfsCtx; + + /** Logger. */ + private final IgniteLogger log; + + /** GGFS marshaller. */ + private final GridGgfsMarshaller marsh; + + /** Endpoint configuration. */ + private final Map<String,String> endpointCfg; + + /** Server endpoint. */ + private GridIpcServerEndpoint srvEndpoint; + + /** Server message handler. */ + private GridGgfsServerHandler hnd; + + /** Accept worker. */ + private AcceptWorker acceptWorker; + + /** Started client workers. */ + private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>(); + + /** Flag indicating if this a management endpoint. */ + private final boolean mgmt; + + /** + * Constructs ggfs server manager. + * @param ggfsCtx GGFS context. + * @param endpointCfg Endpoint configuration to start. + * @param mgmt Management flag - if true, server is intended to be started for Visor. + */ + public GridGgfsServer(GridGgfsContext ggfsCtx, Map<String,String> endpointCfg, boolean mgmt) { + assert ggfsCtx != null; + assert endpointCfg != null; + + this.endpointCfg = endpointCfg; + this.ggfsCtx = ggfsCtx; + this.mgmt = mgmt; + + log = ggfsCtx.kernalContext().log(GridGgfsServer.class); + + marsh = new GridGgfsMarshaller(); + } + + /** + * Starts this server. + * + * @throws IgniteCheckedException If failed. + */ + public void start() throws IgniteCheckedException { + srvEndpoint = GridIpcServerEndpointDeserializer.deserialize(endpointCfg); + + if (U.isWindows() && srvEndpoint instanceof GridIpcSharedMemoryServerEndpoint) + throw new IgniteCheckedException(GridIpcSharedMemoryServerEndpoint.class.getSimpleName() + + " should not be configured on Windows (configure " + + GridIpcServerTcpEndpoint.class.getSimpleName() + ")"); + + if (srvEndpoint instanceof GridIpcServerTcpEndpoint) { + GridIpcServerTcpEndpoint srvEndpoint0 = (GridIpcServerTcpEndpoint)srvEndpoint; + + srvEndpoint0.setManagement(mgmt); + + if (srvEndpoint0.getHost() == null) { + if (mgmt) { + String locHostName = ggfsCtx.kernalContext().config().getLocalHost(); + + try { + srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress()); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e); + } + } + else + // Bind non-management endpoint to 127.0.0.1 by default. + srvEndpoint0.setHost("127.0.0.1"); + } + } + + ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint); + + srvEndpoint.start(); + + // GridIpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. + if (srvEndpoint.getPort() >= 0) + ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); + + hnd = new GridGgfsIpcHandler(ggfsCtx, mgmt); + + // Start client accept worker. + acceptWorker = new AcceptWorker(); + } + + /** + * Callback that is invoked when kernal is ready. + */ + public void onKernalStart() { + // Accept connections only when grid is ready. + if (srvEndpoint != null) + new IgniteThread(acceptWorker).start(); + } + + /** + * Stops this server. + * + * @param cancel Cancel flag. + */ + public void stop(boolean cancel) { + // Skip if did not start. + if (srvEndpoint == null) + return; + + // Stop accepting new client connections. + U.cancel(acceptWorker); + + U.join(acceptWorker, log); + + // Stop server handler, no more requests on existing connections will be processed. + try { + hnd.stop(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to stop GGFS server handler (will close client connections anyway).", e); + } + + // Stop existing client connections. + for (ClientWorker worker : clientWorkers) + U.cancel(worker); + + U.join(clientWorkers, log); + + // GridIpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. + if (srvEndpoint.getPort() >= 0) + ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); + + try { + ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cleanup server endpoint.", e); + } + } + + /** + * Gets IPC server endpoint. + * + * @return IPC server endpoint. + */ + public GridIpcServerEndpoint getIpcServerEndpoint() { + return srvEndpoint; + } + + /** + * Client reader thread. + */ + private class ClientWorker extends GridWorker { + /** Connected client endpoint. */ + private GridIpcEndpoint endpoint; + + /** Data output stream. */ + private final GridGgfsDataOutputStream out; + + /** Client session object. */ + private GridGgfsClientSession ses; + + /** Queue node for fast unlink. */ + private ConcurrentLinkedDeque8.Node<ClientWorker> node; + + /** + * Creates client worker. + * + * @param idx Worker index for worker thread naming. + * @param endpoint Connected client endpoint. + * @throws IgniteCheckedException If endpoint output stream cannot be obtained. + */ + protected ClientWorker(GridIpcEndpoint endpoint, int idx) throws IgniteCheckedException { + super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log); + + this.endpoint = endpoint; + + ses = new GridGgfsClientSession(); + + out = new GridGgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + try { + GridGgfsDataInputStream dis = new GridGgfsDataInputStream(endpoint.inputStream()); + + byte[] hdr = new byte[GridGgfsMarshaller.HEADER_SIZE]; + + boolean first = true; + + while (!Thread.currentThread().isInterrupted()) { + dis.readFully(hdr); + + final long reqId = U.bytesToLong(hdr, 0); + + int ordinal = U.bytesToInt(hdr, 8); + + if (first) { // First message must be HANDSHAKE. + if (reqId != 0 || ordinal != GridGgfsIpcCommand.HANDSHAKE.ordinal()) { + U.warn(log, "Handshake failed."); + + return; + } + + first = false; + } + + final GridGgfsIpcCommand cmd = GridGgfsIpcCommand.valueOf(ordinal); + + GridGgfsMessage msg = marsh.unmarshall(cmd, hdr, dis); + + IgniteFuture<GridGgfsMessage> fut = hnd.handleAsync(ses, msg, dis); + + // If fut is null, no response is required. + if (fut != null) { + if (fut.isDone()) { + GridGgfsMessage res; + + try { + res = fut.get(); + } + catch (IgniteCheckedException e) { + res = new GridGgfsControlResponse(); + + ((GridGgfsControlResponse)res).error(e); + } + + try { + synchronized (out) { + // Reuse header. + GridGgfsMarshaller.fillHeader(hdr, reqId, res.command()); + + marsh.marshall(res, hdr, out); + + out.flush(); + } + } + catch (IOException | IgniteCheckedException e) { + shutdown0(e); + } + } + else { + fut.listenAsync(new CIX1<IgniteFuture<GridGgfsMessage>>() { + @Override public void applyx(IgniteFuture<GridGgfsMessage> fut) { + GridGgfsMessage res; + + try { + res = fut.get(); + } + catch (IgniteCheckedException e) { + res = new GridGgfsControlResponse(); + + ((GridGgfsControlResponse)res).error(e); + } + + try { + synchronized (out) { + byte[] hdr = GridGgfsMarshaller.createHeader(reqId, res.command()); + + marsh.marshall(res, hdr, out); + + out.flush(); + } + } + catch (IOException | IgniteCheckedException e) { + shutdown0(e); + } + } + }); + } + } + } + } + catch (EOFException ignored) { + // Client closed connection. + } + catch (IgniteCheckedException | IOException e) { + if (!isCancelled()) + U.error(log, "Failed to read data from client (will close connection)", e); + } + finally { + onFinished(); + } + } + + /** + * @param node Node in queue for this worker. + */ + public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) { + this.node = node; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + shutdown0(null); + } + + /** + * @param e Optional exception occurred while stopping this + */ + private void shutdown0(@Nullable Throwable e) { + if (!isCancelled()) { + if (e != null) + U.error(log, "Stopping client reader due to exception: " + endpoint, e); + } + + U.closeQuiet(out); + + endpoint.close(); + } + + /** + * Final resource cleanup. + */ + private void onFinished() { + // Second close is no-op, if closed manually. + U.closeQuiet(out); + + endpoint.close(); + + // Finally, remove from queue. + if (clientWorkers.unlinkx(node)) + hnd.onClosed(ses); + } + } + + /** + * Accept worker. + */ + private class AcceptWorker extends GridWorker { + /** Accept index. */ + private int acceptCnt; + + /** + * Creates accept worker. + */ + protected AcceptWorker() { + super(ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + try { + while (!Thread.currentThread().isInterrupted()) { + GridIpcEndpoint client = srvEndpoint.accept(); + + if (log.isDebugEnabled()) + log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() + + ", client=" + client + ']'); + + ClientWorker worker = new ClientWorker(client, acceptCnt++); + + IgniteThread workerThread = new IgniteThread(worker); + + ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker); + + worker.node(node); + + workerThread.start(); + } + } + catch (IgniteCheckedException e) { + if (!isCancelled()) + U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e); + } + finally { + srvEndpoint.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + srvEndpoint.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java new file mode 100644 index 0000000..df5091a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.fs.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS server message handler. Server component that is plugged in to the server implementation + * to handle incoming messages asynchronously. + */ +public interface GridGgfsServerHandler { + /** + * Asynchronously handles incoming message. + * + * @param ses Client session. + * @param msg Message to process. + * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message. + * @return Future that will be completed when response is ready or {@code null} if no + * response is required. + */ + @Nullable public IgniteFuture<GridGgfsMessage> handleAsync(GridGgfsClientSession ses, + GridGgfsMessage msg, DataInput in); + + /** + * Handles handles client close events. + * + * @param ses Session that was closed. + */ + public void onClosed(GridGgfsClientSession ses); + + /** + * Stops handling of incoming requests. No server commands will be handled anymore. + * + * @throws IgniteCheckedException If error occurred. + */ + public void stop() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java new file mode 100644 index 0000000..f214550 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.fs.IgniteFsConfiguration.*; + +/** + * GGFS server manager. + */ +public class GridGgfsServerManager extends GridGgfsManager { + /** IPC server rebind interval. */ + private static final long REBIND_INTERVAL = 3000; + + /** Collection of servers to maintain. */ + private Collection<GridGgfsServer> srvrs; + + /** Server port binders. */ + private BindWorker bindWorker; + + /** Kernal start latch. */ + private CountDownLatch kernalStartLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + IgniteFsConfiguration ggfsCfg = ggfsCtx.configuration(); + Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration(); + + if (F.isEmpty(cfg)) { + // Set default configuration. + cfg = new HashMap<>(); + + cfg.put("type", U.isWindows() ? "tcp" : "shmem"); + cfg.put("port", String.valueOf(DFLT_IPC_PORT)); + } + + if (ggfsCfg.isIpcEndpointEnabled()) + bind(cfg, /*management*/false); + + if (ggfsCfg.getManagementPort() >= 0) { + cfg = new HashMap<>(); + + cfg.put("type", "tcp"); + cfg.put("port", String.valueOf(ggfsCfg.getManagementPort())); + + bind(cfg, /*management*/true); + } + + if (bindWorker != null) + new IgniteThread(bindWorker).start(); + } + + /** + * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread + * that will try to periodically start this endpoint. + * + * @param endpointCfg Endpoint configuration to start. + * @param mgmt {@code True} if endpoint is management. + * @throws IgniteCheckedException If failed. + */ + private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException { + if (srvrs == null) + srvrs = new ConcurrentLinkedQueue<>(); + + GridGgfsServer ipcSrv = new GridGgfsServer(ggfsCtx, endpointCfg, mgmt); + + try { + ipcSrv.start(); + + srvrs.add(ipcSrv); + } + catch (GridIpcEndpointBindException ignored) { + int port = ipcSrv.getIpcServerEndpoint().getPort(); + + String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : ""; + + U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " + + "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." + + portMsg); + + if (bindWorker == null) + bindWorker = new BindWorker(); + + bindWorker.addConfiguration(endpointCfg, mgmt); + } + } + + /** + * @return Collection of active endpoints. + */ + public Collection<GridIpcServerEndpoint> endpoints() { + return F.viewReadOnly(srvrs, new C1<GridGgfsServer, GridIpcServerEndpoint>() { + @Override public GridIpcServerEndpoint apply(GridGgfsServer e) { + return e.getIpcServerEndpoint(); + } + }); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + if (!F.isEmpty(srvrs)) { + for (GridGgfsServer srv : srvrs) + srv.onKernalStart(); + } + + kernalStartLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + // Safety. + kernalStartLatch.countDown(); + + if (bindWorker != null) { + bindWorker.cancel(); + + U.join(bindWorker, log); + } + + if (!F.isEmpty(srvrs)) { + for (GridGgfsServer srv : srvrs) + srv.stop(cancel); + } + } + + /** + * Bind worker. + */ + @SuppressWarnings("BusyWait") + private class BindWorker extends GridWorker { + /** Configurations to bind. */ + private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>(); + + /** + * Constructor. + */ + private BindWorker() { + super(ggfsCtx.kernalContext().gridName(), "bind-worker", ggfsCtx.kernalContext().log()); + } + + /** + * Adds configuration to bind on. Should not be called after thread start. + * + * @param cfg Configuration. + * @param mgmt Management flag. + */ + public void addConfiguration(Map<String, String> cfg, boolean mgmt) { + bindCfgs.add(F.t(cfg, mgmt)); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + kernalStartLatch.await(); + + while (!isCancelled()) { + Thread.sleep(REBIND_INTERVAL); + + Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator(); + + while (it.hasNext()) { + IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next(); + + GridGgfsServer ipcSrv = new GridGgfsServer(ggfsCtx, cfg.get1(), cfg.get2()); + + try { + ipcSrv.start(); + + ipcSrv.onKernalStart(); + + srvrs.add(ipcSrv); + + it.remove(); + } + catch (IgniteCheckedException e) { + if (GridWorker.log.isDebugEnabled()) + GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']'); + } + } + + if (bindCfgs.isEmpty()) + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsStatus.java new file mode 100644 index 0000000..72e00d8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsStatus.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import java.io.*; + +/** + * GGFS response for status request. + */ +public class GridGgfsStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Total space size. */ + private long spaceTotal; + + /** Used space in GGFS. */ + private long spaceUsed; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsStatus() { + // No-op. + } + + /** + * @param spaceUsed Used space in GGFS. + * @param spaceTotal Total space available in GGFS. + */ + public GridGgfsStatus(long spaceUsed, long spaceTotal) { + this.spaceUsed = spaceUsed; + this.spaceTotal = spaceTotal; + } + + /** + * @return Total space available in GGFS. + */ + public long spaceTotal() { + return spaceTotal; + } + + /** + * @return Used space in GGFS. + */ + public long spaceUsed() { + return spaceUsed; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(spaceUsed); + out.writeLong(spaceTotal); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + spaceUsed = in.readLong(); + spaceTotal = in.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java new file mode 100644 index 0000000..1f17b7e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; + +/** + * Basic sync message. + */ +public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Coordinator node order. */ + private long order; + + /** Response flag. */ + private boolean res; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsSyncMessage() { + // No-op. + } + + /** + * @param order Node order. + * @param res Response flag. + */ + public GridGgfsSyncMessage(long order, boolean res) { + this.order = order; + this.res = res; + } + + /** + * @return Coordinator node order. + */ + public long order() { + return order; + } + + /** + * @return {@code True} if response message. + */ + public boolean response() { + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsSyncMessage.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsSyncMessage _clone = new GridGgfsSyncMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsSyncMessage _clone = (GridGgfsSyncMessage)_msg; + + _clone.order = order; + _clone.res = res; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putLong(order)) + return false; + + commState.idx++; + + case 1: + if (!commState.putBoolean(res)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: + if (buf.remaining() < 8) + return false; + + order = commState.getLong(); + + commState.idx++; + + case 1: + if (buf.remaining() < 1) + return false; + + res = commState.getBoolean(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 72; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsThread.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsThread.java new file mode 100644 index 0000000..83afc39 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsThread.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.internal.util.typedef.*; + +/** + * GGFS ad-hoc thread. + */ +public abstract class GridGgfsThread extends Thread { + /** + * Creates {@code GGFS} add-hoc thread. + */ + protected GridGgfsThread() { + super("ggfs-worker"); + } + + /** + * Creates {@code GGFS} add-hoc thread. + * + * @param name Thread name. + */ + protected GridGgfsThread(String name) { + super(name); + } + + /** {@inheritDoc} */ + @Override public final void run() { + try { + body(); + } + catch (InterruptedException ignore) { + interrupt(); + } + // Catch all. + catch (Throwable e) { + X.error("Failed to execute GGFS ad-hoc thread: " + e.getMessage()); + + e.printStackTrace(); + } + finally { + try { + cleanup(); + } + // Catch all. + catch (Throwable e) { + X.error("Failed to clean up GGFS ad-hoc thread: " + e.getMessage()); + + e.printStackTrace(); + } + } + } + + /** + * Thread body. + * + * @throws InterruptedException If interrupted. + */ + protected abstract void body() throws InterruptedException; + + /** + * Cleanup. + */ + protected void cleanup() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java new file mode 100644 index 0000000..c376ecd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsHelper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * No-op utils processor adapter. + */ +public class GridNoopGgfsHelper implements GridGgfsHelper { + /** {@inheritDoc} */ + @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean isGgfsBlockKey(Object key) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsProcessor.java new file mode 100644 index 0000000..6e4bf7b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridNoopGgfsProcessor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Nop GGFS processor implementation. + */ +public class GridNoopGgfsProcessor extends GridGgfsProcessorAdapter { + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public GridNoopGgfsProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ggfsCacheSize: " + 0); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFs> ggfss() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFs ggfs(@Nullable String name) { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, + long start, long length, IgniteFsRecordResolver recRslv) { + return null; + } +}
