# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/243e521e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/243e521e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/243e521e Branch: refs/heads/master Commit: 243e521ec2f9ab5a01860fe0ce983d91a9d683d2 Parents: 7cd638f Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 11:26:58 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 11:26:58 2014 +0300 ---------------------------------------------------------------------- .../examples/ggfs/GgfsMapReduceExample.java | 4 +- .../grid/ggfs/mapreduce/GridGgfsTaskArgs.java | 74 ---- .../mapreduce/GridGgfsTaskNoReduceAdapter.java | 34 -- .../ggfs/mapreduce/IgniteFsRecordResolver.java | 9 +- .../grid/ggfs/mapreduce/IgniteFsTask.java | 8 +- .../grid/ggfs/mapreduce/IgniteFsTaskArgs.java | 74 ++++ .../mapreduce/IgniteFsTaskNoReduceAdapter.java | 34 ++ .../GridGgfsByteDelimiterRecordResolver.java | 340 ------------------- .../GridGgfsFixedLengthRecordResolver.java | 79 ----- .../records/GridGgfsNewLineRecordResolver.java | 58 ---- .../GridGgfsStringDelimiterRecordResolver.java | 76 ----- .../IgniteFsByteDelimiterRecordResolver.java | 340 +++++++++++++++++++ .../IgniteFsFixedLengthRecordResolver.java | 79 +++++ .../records/IgniteFsNewLineRecordResolver.java | 58 ++++ .../IgniteFsStringDelimiterRecordResolver.java | 76 +++++ .../kernal/processors/ggfs/GridGgfsImpl.java | 4 +- .../processors/ggfs/GridGgfsTaskArgsImpl.java | 127 ------- .../processors/ggfs/IgniteFsTaskArgsImpl.java | 127 +++++++ .../processors/ggfs/GridGgfsTaskSelfTest.java | 6 +- ...GgfsByteDelimiterRecordResolverSelfTest.java | 8 +- ...idGgfsFixedLengthRecordResolverSelfTest.java | 8 +- ...sNewLineDelimiterRecordResolverSelfTest.java | 10 +- ...fsStringDelimiterRecordResolverSelfTest.java | 8 +- 23 files changed, 820 insertions(+), 821 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java index edd3f70..454a54e 100644 --- a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java +++ b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java @@ -64,7 +64,7 @@ public class GgfsMapReduceExample { writeFile(fs, fsPath, file); - Collection<Line> lines = fs.execute(new GrepTask(), GridGgfsNewLineRecordResolver.NEW_LINE, + Collection<Line> lines = fs.execute(new GrepTask(), IgniteFsNewLineRecordResolver.NEW_LINE, Collections.singleton(fsPath), regexStr); if (lines.isEmpty()) { @@ -122,7 +122,7 @@ public class GgfsMapReduceExample { private static class GrepTask extends IgniteFsTask<String, Collection<Line>> { /** {@inheritDoc} */ @Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, - GridGgfsTaskArgs<String> args) throws GridException { + IgniteFsTaskArgs<String> args) throws GridException { return new GrepJob(args.userArgument()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java deleted file mode 100644 index caa0b44..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java +++ /dev/null @@ -1,74 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.gridgain.grid.ggfs.*; - -import java.util.*; - -/** - * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods, - * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is - * passed to {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method. - * <p> - * Task arguments encapsulates the following data: - * <ul> - * <li>GGFS name</li> - * <li>File paths passed to {@code GridGgfs.execute()} method</li> - * <li>{@link IgniteFsRecordResolver} for that task</li> - * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li> - * <li>User-defined task argument</li> - * <li>Maximum file range length for that task (see {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li> - * </ul> - */ -public interface GridGgfsTaskArgs<T> { - /** - * Gets GGFS name. - * - * @return GGFS name. - */ - public String ggfsName(); - - /** - * Gets file paths to process. - * - * @return File paths to process. - */ - public Collection<IgniteFsPath> paths(); - - /** - * Gets record resolver for the task. - * - * @return Record resolver. - */ - public IgniteFsRecordResolver recordResolver(); - - /** - * Flag indicating whether to fail or simply skip non-existent files. - * - * @return {@code True} if non-existent files should be skipped. - */ - public boolean skipNonExistentFiles(); - - /** - * User argument provided for task execution. - * - * @return User argument. - */ - public T userArgument(); - - /** - * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including - * all consecutive blocks will be used without any limitations. - * - * @return Maximum range length. - */ - public long maxRangeLength(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java deleted file mode 100644 index 802b7a5..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.compute.*; - -import java.util.*; - -/** - * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in - * results returned by jobs. - */ -public abstract class GridGgfsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Default implementation which will ignore all results sent from execution nodes. - * - * @param results Received results of broadcasted remote executions. Note that if task class has - * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty. - * @return Will always return {@code null}. - */ - @Override public R reduce(List<ComputeJobResult> results) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java index fdddc06..e9d254f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java @@ -12,7 +12,6 @@ package org.gridgain.grid.ggfs.mapreduce; import org.apache.ignite.*; import org.gridgain.grid.*; import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.ggfs.mapreduce.records.*; import org.jetbrains.annotations.*; import java.io.*; @@ -28,10 +27,10 @@ import java.io.*; * <p> * The following record resolvers are available out of the box: * <ul> - * <li>{@link GridGgfsFixedLengthRecordResolver}</li> - * <li>{@link GridGgfsByteDelimiterRecordResolver}</li> - * <li>{@link GridGgfsStringDelimiterRecordResolver}</li> - * <li>{@link GridGgfsNewLineRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver}</li> * </ul> */ public interface IgniteFsRecordResolver extends Serializable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java index 0721d0b..edfdf03 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java @@ -26,7 +26,7 @@ import java.util.*; * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement - * {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method. + * {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. * <p> * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size @@ -67,7 +67,7 @@ import java.util.*; * } * </pre> */ -public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTaskArgs<T>, R> { +public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> { /** */ private static final long serialVersionUID = 0L; @@ -77,7 +77,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTask /** {@inheritDoc} */ @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable GridGgfsTaskArgs<T> args) throws GridException { + @Nullable IgniteFsTaskArgs<T> args) throws GridException { assert ignite != null; assert args != null; @@ -146,7 +146,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTask * @throws GridException If job creation failed. */ @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, - GridGgfsTaskArgs<T> args) throws GridException; + IgniteFsTaskArgs<T> args) throws GridException; /** * Maps list by node ID. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java new file mode 100644 index 0000000..4eb7757 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java @@ -0,0 +1,74 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce; + +import org.gridgain.grid.ggfs.*; + +import java.util.*; + +/** + * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods, + * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is + * passed to {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. + * <p> + * Task arguments encapsulates the following data: + * <ul> + * <li>GGFS name</li> + * <li>File paths passed to {@code GridGgfs.execute()} method</li> + * <li>{@link IgniteFsRecordResolver} for that task</li> + * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li> + * <li>User-defined task argument</li> + * <li>Maximum file range length for that task (see {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li> + * </ul> + */ +public interface IgniteFsTaskArgs<T> { + /** + * Gets GGFS name. + * + * @return GGFS name. + */ + public String ggfsName(); + + /** + * Gets file paths to process. + * + * @return File paths to process. + */ + public Collection<IgniteFsPath> paths(); + + /** + * Gets record resolver for the task. + * + * @return Record resolver. + */ + public IgniteFsRecordResolver recordResolver(); + + /** + * Flag indicating whether to fail or simply skip non-existent files. + * + * @return {@code True} if non-existent files should be skipped. + */ + public boolean skipNonExistentFiles(); + + /** + * User argument provided for task execution. + * + * @return User argument. + */ + public T userArgument(); + + /** + * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including + * all consecutive blocks will be used without any limitations. + * + * @return Maximum range length. + */ + public long maxRangeLength(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java new file mode 100644 index 0000000..180d7a4 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java @@ -0,0 +1,34 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce; + +import org.apache.ignite.compute.*; + +import java.util.*; + +/** + * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in + * results returned by jobs. + */ +public abstract class IgniteFsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Default implementation which will ignore all results sent from execution nodes. + * + * @param results Received results of broadcasted remote executions. Note that if task class has + * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty. + * @return Will always return {@code null}. + */ + @Override public R reduce(List<ComputeJobResult> results) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java deleted file mode 100644 index 808092e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java +++ /dev/null @@ -1,340 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce.records; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.ggfs.mapreduce.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Record resolver which adjusts records based on provided delimiters. Both start position and length are - * shifted to the right, based on delimiter positions. - * <p> - * Note that you can use {@link GridGgfsStringDelimiterRecordResolver} if your delimiter is a plain string. - */ -public class GridGgfsByteDelimiterRecordResolver implements IgniteFsRecordResolver, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Delimiters. */ - private byte[][] delims; - - /** Maximum delimiter length. */ - @GridToStringExclude - private int maxDelimLen; - - /** - * Empty constructor required for {@link Externalizable} support. - */ - public GridGgfsByteDelimiterRecordResolver() { - // No-op. - } - - /** - * Creates delimiter-based record resolver. - * - * @param delims Delimiters. - */ - public GridGgfsByteDelimiterRecordResolver(byte[]... delims) { - if (delims == null || delims.length == 0) - throw new IllegalArgumentException("Delimiters cannot be null or empty."); - - this.delims = delims; - - int maxDelimLen = 0; - - for (byte[] delim : delims) { - if (delim == null) - throw new IllegalArgumentException("Delimiter cannot be null."); - else if (maxDelimLen < delim.length) - maxDelimLen = delim.length; - } - - this.maxDelimLen = maxDelimLen; - } - - /** {@inheritDoc} */ - @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, - IgniteFsFileRange suggestedRecord) throws GridException, IOException { - long suggestedStart = suggestedRecord.start(); - long suggestedEnd = suggestedStart + suggestedRecord.length(); - - IgniteBiTuple<State, Delimiter> firstDelim = findFirstDelimiter(stream, suggestedStart); - - State state = firstDelim != null ? firstDelim.getKey() : new State(); - - Delimiter curDelim = firstDelim.getValue(); - - while (curDelim != null && curDelim.end < suggestedStart) - curDelim = nextDelimiter(stream, state); - - if (curDelim != null && (curDelim.end >= suggestedStart && curDelim.end < suggestedEnd) || - suggestedStart == 0 ) { - // We found start delimiter. - long start = suggestedStart == 0 ? 0 : curDelim.end; - - if (curDelim == null || curDelim.end < suggestedEnd) { - IgniteBiTuple<State, Delimiter> lastDelim = findFirstDelimiter(stream, suggestedEnd); - - state = lastDelim != null ? firstDelim.getKey() : new State(); - - curDelim = lastDelim.getValue(); - - while (curDelim != null && curDelim.end < suggestedEnd) - curDelim = nextDelimiter(stream, state); - } - - long end = curDelim != null ? curDelim.end : stream.position(); - - return new IgniteFsFileRange(suggestedRecord.path(), start, end - start); - } - else - // We failed to find any delimiters up to the EOS. - return null; - } - - /** - * Calculate maximum delimiters length. - * - * @param delims Delimiters. - * @return Maximum delimiter length. - */ - private int maxDelimiterLength(byte[][] delims) { - int maxDelimLen = 0; - - for (byte[] delim : delims) { - if (delim == null) - throw new IllegalArgumentException("Delimiter cannot be null."); - else if (maxDelimLen < delim.length) - maxDelimLen = delim.length; - } - - return maxDelimLen; - } - - /** - * Find first delimiter. In order to achieve this we have to rewind the stream until we find the delimiter - * which stands at least [maxDelimLen] from the start search position or until we faced stream start. - * Otherwise we cannot be sure that delimiter position is determined correctly. - * - * @param stream GGFS input stream. - * @param startPos Start search position. - * @return The first found delimiter. - * @throws IOException In case of IO exception. - */ - @Nullable private IgniteBiTuple<State, Delimiter> findFirstDelimiter(IgniteFsInputStream stream, long startPos) - throws IOException { - State state; - Delimiter delim; - - long curPos = Math.max(0, startPos - maxDelimLen); - - while (true) { - stream.seek(curPos); - - state = new State(); - - delim = nextDelimiter(stream, state); - - if (curPos == 0 || delim == null || delim.start - curPos > maxDelimLen - 1) - break; - else - curPos = Math.max(0, curPos - maxDelimLen); - } - - return F.t(state, delim); - } - - /** - * Resolve next delimiter. - * - * @param is GGFS input stream. - * @param state Current state. - * @return Next delimiter and updated map. - * @throws IOException In case of exception. - */ - private Delimiter nextDelimiter(IgniteFsInputStream is, State state) throws IOException { - assert is != null; - assert state != null; - - Map<Integer, Integer> parts = state.parts; - LinkedList<Delimiter> delimQueue = state.delims; - - int nextByte = is.read(); - - while (nextByte != -1) { - // Process read byte. - for (int idx = 0; idx < delims.length; idx++) { - byte[] delim = delims[idx]; - - int val = parts.containsKey(idx) ? parts.get(idx) : 0; - - if (delim[val] == nextByte) { - if (val == delim.length - 1) { - // Full delimiter is found. - parts.remove(idx); - - Delimiter newDelim = new Delimiter(is.position() - delim.length, is.position()); - - // Read queue from the end looking for the "inner" delimiters. - boolean ignore = false; - - int replaceIdx = -1; - - for (int i = delimQueue.size() - 1; i >= 0; i--) { - Delimiter prevDelim = delimQueue.get(i); - - if (prevDelim.start < newDelim.start) { - if (prevDelim.end > newDelim.start) { - // Ignore this delimiter. - ignore = true; - - break; - } - } - else if (prevDelim.start == newDelim.start) { - // Ok, we found matching delimiter. - replaceIdx = i; - - break; - } - } - - if (!ignore) { - if (replaceIdx >= 0) - delimQueue.removeAll(delimQueue.subList(replaceIdx, delimQueue.size())); - - delimQueue.add(newDelim); - } - } - else - parts.put(idx, ++val); - } - else if (val != 0) { - if (delim[0] == nextByte) { - boolean shift = true; - - for (int k = 1; k < val; k++) { - if (delim[k] != nextByte) { - shift = false; - - break; - } - } - - if (!shift) - parts.put(idx, 1); - } - else - // Delimiter sequence is totally broken. - parts.remove(idx); - } - } - - // Check whether we can be sure that the first delimiter will not change. - if (!delimQueue.isEmpty()) { - Delimiter delim = delimQueue.get(0); - - if (is.position() - delim.end >= maxDelimLen) - return delimQueue.poll(); - } - - nextByte = is.read(); - } - - return delimQueue.poll(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsByteDelimiterRecordResolver.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - if (delims != null) { - out.writeBoolean(true); - - out.writeInt(delims.length); - - for (byte[] delim : delims) - U.writeByteArray(out, delim); - } - else - out.writeBoolean(false); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - if (in.readBoolean()) { - int len = in.readInt(); - - delims = new byte[len][]; - - for (int i = 0; i < len; i++) - delims[i] = U.readByteArray(in); - - maxDelimLen = maxDelimiterLength(delims); - } - } - - /** - * Delimiter descriptor. - */ - private static class Delimiter { - /** Delimiter start position. */ - private final long start; - - /** Delimiter end position. */ - private final long end; - - /** - * Constructor. - * - * @param start Delimiter start position. - * @param end Delimiter end position. - */ - private Delimiter(long start, long end) { - assert start >= 0 && end >= 0 && start <= end; - - this.start = start; - this.end = end; - } - } - - /** - * Current resolution state. - */ - private static class State { - /** Partially resolved delimiters. */ - private final Map<Integer, Integer> parts; - - /** Resolved delimiters which could potentially be merged. */ - private final LinkedList<Delimiter> delims; - - /** - * Constructor. - */ - private State() { - parts = new HashMap<>(); - - delims = new LinkedList<>(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java deleted file mode 100644 index 1edeb1a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java +++ /dev/null @@ -1,79 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce.records; - -import org.apache.ignite.*; -import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.ggfs.mapreduce.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.io.*; - -/** - * Record resolver which adjusts records to fixed length. That is, start offset of the record is shifted to the - * nearest position so that {@code newStart % length == 0}. - */ -public class GridGgfsFixedLengthRecordResolver implements IgniteFsRecordResolver, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Record length. */ - private long recLen; - - /** - * Empty constructor required for {@link Externalizable} support. - */ - public GridGgfsFixedLengthRecordResolver() { - // No-op. - } - - /** - * Creates fixed-length record resolver. - * - * @param recLen Record length. - */ - public GridGgfsFixedLengthRecordResolver(long recLen) { - this.recLen = recLen; - } - - /** {@inheritDoc} */ - @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, - IgniteFsFileRange suggestedRecord) - throws GridException, IOException { - long suggestedEnd = suggestedRecord.start() + suggestedRecord.length(); - - long startRem = suggestedRecord.start() % recLen; - long endRem = suggestedEnd % recLen; - - long start = Math.min(suggestedRecord.start() + (startRem != 0 ? (recLen - startRem) : 0), - stream.length()); - long end = Math.min(suggestedEnd + (endRem != 0 ? (recLen - endRem) : 0), stream.length()); - - assert end >= start; - - return start != end ? new IgniteFsFileRange(suggestedRecord.path(), start, end - start) : null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsFixedLengthRecordResolver.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(recLen); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - recLen = in.readLong(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java deleted file mode 100644 index 808759f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java +++ /dev/null @@ -1,58 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce.records; - -import org.gridgain.grid.util.typedef.internal.*; - -import java.io.*; - -/** - * Record resolver based on new line detection. This resolver can detect new lines based on '\n' or '\r\n' sequences. - * <p> - * Note that this resolver cannot be created and has one constant implementations: {@link #NEW_LINE}. - */ -public class GridGgfsNewLineRecordResolver extends GridGgfsByteDelimiterRecordResolver { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Singleton new line resolver. This resolver will resolve records based on new lines - * regardless if they have '\n' or '\r\n' patterns. - */ - public static final GridGgfsNewLineRecordResolver NEW_LINE = new GridGgfsNewLineRecordResolver(true); - - /** CR symbol. */ - public static final byte SYM_CR = 0x0D; - - /** LF symbol. */ - public static final byte SYM_LF = 0x0A; - - /** - * Empty constructor required for {@link Externalizable} support. - */ - public GridGgfsNewLineRecordResolver() { - // No-op. - } - - /** - * Creates new-line record resolver. - * - * @param b Artificial flag to differentiate from empty constructor. - */ - @SuppressWarnings("UnusedParameters") - private GridGgfsNewLineRecordResolver(boolean b) { - super(new byte[] { SYM_CR, SYM_LF }, new byte[] { SYM_LF }); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsNewLineRecordResolver.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java deleted file mode 100644 index 504b7e9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java +++ /dev/null @@ -1,76 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce.records; - -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.charset.*; - -/** - * Record resolver based on delimiters represented as strings. Works in the same way as - * {@link GridGgfsByteDelimiterRecordResolver}, but uses strings as delimiters instead of byte arrays. - */ -public class GridGgfsStringDelimiterRecordResolver extends GridGgfsByteDelimiterRecordResolver { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Converts string delimiters to byte delimiters. - * - * @param charset Charset. - * @param delims String delimiters. - * @return Byte delimiters. - */ - @Nullable private static byte[][] toBytes(Charset charset, @Nullable String... delims) { - byte[][] res = null; - - if (delims != null) { - res = new byte[delims.length][]; - - for (int i = 0; i < delims.length; i++) - res[i] = delims[i].getBytes(charset); - } - - return res; - } - - /** - * Empty constructor required for {@link Externalizable} support. - */ - public GridGgfsStringDelimiterRecordResolver() { - // No-op. - } - - /** - * Creates record resolver from given string and given charset. - * - * @param delims Delimiters. - * @param charset Charset. - */ - public GridGgfsStringDelimiterRecordResolver(Charset charset, String... delims) { - super(toBytes(charset, delims)); - } - - /** - * Creates record resolver based on given string with default charset. - * - * @param delims Delimiters. - */ - public GridGgfsStringDelimiterRecordResolver(String... delims) { - super(toBytes(Charset.defaultCharset(), delims)); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsStringDelimiterRecordResolver.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java new file mode 100644 index 0000000..79f928e --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java @@ -0,0 +1,340 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce.records; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.grid.ggfs.*; +import org.gridgain.grid.ggfs.mapreduce.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Record resolver which adjusts records based on provided delimiters. Both start position and length are + * shifted to the right, based on delimiter positions. + * <p> + * Note that you can use {@link IgniteFsStringDelimiterRecordResolver} if your delimiter is a plain string. + */ +public class IgniteFsByteDelimiterRecordResolver implements IgniteFsRecordResolver, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Delimiters. */ + private byte[][] delims; + + /** Maximum delimiter length. */ + @GridToStringExclude + private int maxDelimLen; + + /** + * Empty constructor required for {@link Externalizable} support. + */ + public IgniteFsByteDelimiterRecordResolver() { + // No-op. + } + + /** + * Creates delimiter-based record resolver. + * + * @param delims Delimiters. + */ + public IgniteFsByteDelimiterRecordResolver(byte[]... delims) { + if (delims == null || delims.length == 0) + throw new IllegalArgumentException("Delimiters cannot be null or empty."); + + this.delims = delims; + + int maxDelimLen = 0; + + for (byte[] delim : delims) { + if (delim == null) + throw new IllegalArgumentException("Delimiter cannot be null."); + else if (maxDelimLen < delim.length) + maxDelimLen = delim.length; + } + + this.maxDelimLen = maxDelimLen; + } + + /** {@inheritDoc} */ + @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + IgniteFsFileRange suggestedRecord) throws GridException, IOException { + long suggestedStart = suggestedRecord.start(); + long suggestedEnd = suggestedStart + suggestedRecord.length(); + + IgniteBiTuple<State, Delimiter> firstDelim = findFirstDelimiter(stream, suggestedStart); + + State state = firstDelim != null ? firstDelim.getKey() : new State(); + + Delimiter curDelim = firstDelim.getValue(); + + while (curDelim != null && curDelim.end < suggestedStart) + curDelim = nextDelimiter(stream, state); + + if (curDelim != null && (curDelim.end >= suggestedStart && curDelim.end < suggestedEnd) || + suggestedStart == 0 ) { + // We found start delimiter. + long start = suggestedStart == 0 ? 0 : curDelim.end; + + if (curDelim == null || curDelim.end < suggestedEnd) { + IgniteBiTuple<State, Delimiter> lastDelim = findFirstDelimiter(stream, suggestedEnd); + + state = lastDelim != null ? firstDelim.getKey() : new State(); + + curDelim = lastDelim.getValue(); + + while (curDelim != null && curDelim.end < suggestedEnd) + curDelim = nextDelimiter(stream, state); + } + + long end = curDelim != null ? curDelim.end : stream.position(); + + return new IgniteFsFileRange(suggestedRecord.path(), start, end - start); + } + else + // We failed to find any delimiters up to the EOS. + return null; + } + + /** + * Calculate maximum delimiters length. + * + * @param delims Delimiters. + * @return Maximum delimiter length. + */ + private int maxDelimiterLength(byte[][] delims) { + int maxDelimLen = 0; + + for (byte[] delim : delims) { + if (delim == null) + throw new IllegalArgumentException("Delimiter cannot be null."); + else if (maxDelimLen < delim.length) + maxDelimLen = delim.length; + } + + return maxDelimLen; + } + + /** + * Find first delimiter. In order to achieve this we have to rewind the stream until we find the delimiter + * which stands at least [maxDelimLen] from the start search position or until we faced stream start. + * Otherwise we cannot be sure that delimiter position is determined correctly. + * + * @param stream GGFS input stream. + * @param startPos Start search position. + * @return The first found delimiter. + * @throws IOException In case of IO exception. + */ + @Nullable private IgniteBiTuple<State, Delimiter> findFirstDelimiter(IgniteFsInputStream stream, long startPos) + throws IOException { + State state; + Delimiter delim; + + long curPos = Math.max(0, startPos - maxDelimLen); + + while (true) { + stream.seek(curPos); + + state = new State(); + + delim = nextDelimiter(stream, state); + + if (curPos == 0 || delim == null || delim.start - curPos > maxDelimLen - 1) + break; + else + curPos = Math.max(0, curPos - maxDelimLen); + } + + return F.t(state, delim); + } + + /** + * Resolve next delimiter. + * + * @param is GGFS input stream. + * @param state Current state. + * @return Next delimiter and updated map. + * @throws IOException In case of exception. + */ + private Delimiter nextDelimiter(IgniteFsInputStream is, State state) throws IOException { + assert is != null; + assert state != null; + + Map<Integer, Integer> parts = state.parts; + LinkedList<Delimiter> delimQueue = state.delims; + + int nextByte = is.read(); + + while (nextByte != -1) { + // Process read byte. + for (int idx = 0; idx < delims.length; idx++) { + byte[] delim = delims[idx]; + + int val = parts.containsKey(idx) ? parts.get(idx) : 0; + + if (delim[val] == nextByte) { + if (val == delim.length - 1) { + // Full delimiter is found. + parts.remove(idx); + + Delimiter newDelim = new Delimiter(is.position() - delim.length, is.position()); + + // Read queue from the end looking for the "inner" delimiters. + boolean ignore = false; + + int replaceIdx = -1; + + for (int i = delimQueue.size() - 1; i >= 0; i--) { + Delimiter prevDelim = delimQueue.get(i); + + if (prevDelim.start < newDelim.start) { + if (prevDelim.end > newDelim.start) { + // Ignore this delimiter. + ignore = true; + + break; + } + } + else if (prevDelim.start == newDelim.start) { + // Ok, we found matching delimiter. + replaceIdx = i; + + break; + } + } + + if (!ignore) { + if (replaceIdx >= 0) + delimQueue.removeAll(delimQueue.subList(replaceIdx, delimQueue.size())); + + delimQueue.add(newDelim); + } + } + else + parts.put(idx, ++val); + } + else if (val != 0) { + if (delim[0] == nextByte) { + boolean shift = true; + + for (int k = 1; k < val; k++) { + if (delim[k] != nextByte) { + shift = false; + + break; + } + } + + if (!shift) + parts.put(idx, 1); + } + else + // Delimiter sequence is totally broken. + parts.remove(idx); + } + } + + // Check whether we can be sure that the first delimiter will not change. + if (!delimQueue.isEmpty()) { + Delimiter delim = delimQueue.get(0); + + if (is.position() - delim.end >= maxDelimLen) + return delimQueue.poll(); + } + + nextByte = is.read(); + } + + return delimQueue.poll(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsByteDelimiterRecordResolver.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + if (delims != null) { + out.writeBoolean(true); + + out.writeInt(delims.length); + + for (byte[] delim : delims) + U.writeByteArray(out, delim); + } + else + out.writeBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + if (in.readBoolean()) { + int len = in.readInt(); + + delims = new byte[len][]; + + for (int i = 0; i < len; i++) + delims[i] = U.readByteArray(in); + + maxDelimLen = maxDelimiterLength(delims); + } + } + + /** + * Delimiter descriptor. + */ + private static class Delimiter { + /** Delimiter start position. */ + private final long start; + + /** Delimiter end position. */ + private final long end; + + /** + * Constructor. + * + * @param start Delimiter start position. + * @param end Delimiter end position. + */ + private Delimiter(long start, long end) { + assert start >= 0 && end >= 0 && start <= end; + + this.start = start; + this.end = end; + } + } + + /** + * Current resolution state. + */ + private static class State { + /** Partially resolved delimiters. */ + private final Map<Integer, Integer> parts; + + /** Resolved delimiters which could potentially be merged. */ + private final LinkedList<Delimiter> delims; + + /** + * Constructor. + */ + private State() { + parts = new HashMap<>(); + + delims = new LinkedList<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java new file mode 100644 index 0000000..6190207 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java @@ -0,0 +1,79 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce.records; + +import org.apache.ignite.*; +import org.gridgain.grid.*; +import org.gridgain.grid.ggfs.*; +import org.gridgain.grid.ggfs.mapreduce.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; + +/** + * Record resolver which adjusts records to fixed length. That is, start offset of the record is shifted to the + * nearest position so that {@code newStart % length == 0}. + */ +public class IgniteFsFixedLengthRecordResolver implements IgniteFsRecordResolver, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Record length. */ + private long recLen; + + /** + * Empty constructor required for {@link Externalizable} support. + */ + public IgniteFsFixedLengthRecordResolver() { + // No-op. + } + + /** + * Creates fixed-length record resolver. + * + * @param recLen Record length. + */ + public IgniteFsFixedLengthRecordResolver(long recLen) { + this.recLen = recLen; + } + + /** {@inheritDoc} */ + @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + IgniteFsFileRange suggestedRecord) + throws GridException, IOException { + long suggestedEnd = suggestedRecord.start() + suggestedRecord.length(); + + long startRem = suggestedRecord.start() % recLen; + long endRem = suggestedEnd % recLen; + + long start = Math.min(suggestedRecord.start() + (startRem != 0 ? (recLen - startRem) : 0), + stream.length()); + long end = Math.min(suggestedEnd + (endRem != 0 ? (recLen - endRem) : 0), stream.length()); + + assert end >= start; + + return start != end ? new IgniteFsFileRange(suggestedRecord.path(), start, end - start) : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsFixedLengthRecordResolver.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(recLen); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + recLen = in.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java new file mode 100644 index 0000000..81f359e --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java @@ -0,0 +1,58 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce.records; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; + +/** + * Record resolver based on new line detection. This resolver can detect new lines based on '\n' or '\r\n' sequences. + * <p> + * Note that this resolver cannot be created and has one constant implementations: {@link #NEW_LINE}. + */ +public class IgniteFsNewLineRecordResolver extends IgniteFsByteDelimiterRecordResolver { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Singleton new line resolver. This resolver will resolve records based on new lines + * regardless if they have '\n' or '\r\n' patterns. + */ + public static final IgniteFsNewLineRecordResolver NEW_LINE = new IgniteFsNewLineRecordResolver(true); + + /** CR symbol. */ + public static final byte SYM_CR = 0x0D; + + /** LF symbol. */ + public static final byte SYM_LF = 0x0A; + + /** + * Empty constructor required for {@link Externalizable} support. + */ + public IgniteFsNewLineRecordResolver() { + // No-op. + } + + /** + * Creates new-line record resolver. + * + * @param b Artificial flag to differentiate from empty constructor. + */ + @SuppressWarnings("UnusedParameters") + private IgniteFsNewLineRecordResolver(boolean b) { + super(new byte[] { SYM_CR, SYM_LF }, new byte[] { SYM_LF }); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsNewLineRecordResolver.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java new file mode 100644 index 0000000..3a42333 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java @@ -0,0 +1,76 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce.records; + +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.charset.*; + +/** + * Record resolver based on delimiters represented as strings. Works in the same way as + * {@link IgniteFsByteDelimiterRecordResolver}, but uses strings as delimiters instead of byte arrays. + */ +public class IgniteFsStringDelimiterRecordResolver extends IgniteFsByteDelimiterRecordResolver { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Converts string delimiters to byte delimiters. + * + * @param charset Charset. + * @param delims String delimiters. + * @return Byte delimiters. + */ + @Nullable private static byte[][] toBytes(Charset charset, @Nullable String... delims) { + byte[][] res = null; + + if (delims != null) { + res = new byte[delims.length][]; + + for (int i = 0; i < delims.length; i++) + res[i] = delims[i].getBytes(charset); + } + + return res; + } + + /** + * Empty constructor required for {@link Externalizable} support. + */ + public IgniteFsStringDelimiterRecordResolver() { + // No-op. + } + + /** + * Creates record resolver from given string and given charset. + * + * @param delims Delimiters. + * @param charset Charset. + */ + public IgniteFsStringDelimiterRecordResolver(Charset charset, String... delims) { + super(toBytes(charset, delims)); + } + + /** + * Creates record resolver based on given string with default charset. + * + * @param delims Delimiters. + */ + public IgniteFsStringDelimiterRecordResolver(String... delims) { + super(toBytes(Charset.defaultCharset(), delims)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsStringDelimiterRecordResolver.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java index dff827e..5d4f4ad 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java @@ -1722,7 +1722,7 @@ public final class GridGgfsImpl implements GridGgfsEx { */ <T, R> IgniteFuture<R> executeAsync(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { - return ggfsCtx.kernalContext().task().execute(task, new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, + return ggfsCtx.kernalContext().task().execute(task, new IgniteFsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); } @@ -1757,7 +1757,7 @@ public final class GridGgfsImpl implements GridGgfsEx { @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { return ggfsCtx.kernalContext().task().execute((Class<IgniteFsTask<T, R>>)taskCls, - new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); + new IgniteFsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java deleted file mode 100644 index 44cac9d..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java +++ /dev/null @@ -1,127 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.ggfs; - -import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.ggfs.mapreduce.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * GGFS task arguments implementation. - */ -public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS name. */ - private String ggfsName; - - /** Paths. */ - private Collection<IgniteFsPath> paths; - - /** Record resolver. */ - private IgniteFsRecordResolver recRslvr; - - /** Skip non existent files flag. */ - private boolean skipNonExistentFiles; - - /** Maximum range length. */ - private long maxRangeLen; - - /** User argument. */ - private T usrArg; - - /** - * {@link Externalizable} support. - */ - public GridGgfsTaskArgsImpl() { - // No-op. - } - - /** - * Constructor. - * - * @param ggfsName GGFS name. - * @param paths Paths. - * @param recRslvr Record resolver. - * @param skipNonExistentFiles Skip non existent files flag. - * @param maxRangeLen Maximum range length. - * @param usrArg User argument. - */ - public GridGgfsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr, - boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { - this.ggfsName = ggfsName; - this.paths = paths; - this.recRslvr = recRslvr; - this.skipNonExistentFiles = skipNonExistentFiles; - this.maxRangeLen = maxRangeLen; - this.usrArg = usrArg; - } - - /** {@inheritDoc} */ - @Override public String ggfsName() { - return ggfsName; - } - - /** {@inheritDoc} */ - @Override public Collection<IgniteFsPath> paths() { - return paths; - } - - /** {@inheritDoc} */ - @Override public IgniteFsRecordResolver recordResolver() { - return recRslvr; - } - - /** {@inheritDoc} */ - @Override public boolean skipNonExistentFiles() { - return skipNonExistentFiles; - } - - /** {@inheritDoc} */ - @Override public long maxRangeLength() { - return maxRangeLen; - } - - /** {@inheritDoc} */ - @Override public T userArgument() { - return usrArg; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsTaskArgsImpl.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, ggfsName); - U.writeCollection(out, paths); - - out.writeObject(recRslvr); - out.writeBoolean(skipNonExistentFiles); - out.writeLong(maxRangeLen); - out.writeObject(usrArg); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ggfsName = U.readString(in); - paths = U.readCollection(in); - - recRslvr = (IgniteFsRecordResolver)in.readObject(); - skipNonExistentFiles = in.readBoolean(); - maxRangeLen = in.readLong(); - usrArg = (T)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java new file mode 100644 index 0000000..1edaac8 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java @@ -0,0 +1,127 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.ggfs; + +import org.gridgain.grid.ggfs.*; +import org.gridgain.grid.ggfs.mapreduce.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * GGFS task arguments implementation. + */ +public class IgniteFsTaskArgsImpl<T> implements IgniteFsTaskArgs<T>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** Paths. */ + private Collection<IgniteFsPath> paths; + + /** Record resolver. */ + private IgniteFsRecordResolver recRslvr; + + /** Skip non existent files flag. */ + private boolean skipNonExistentFiles; + + /** Maximum range length. */ + private long maxRangeLen; + + /** User argument. */ + private T usrArg; + + /** + * {@link Externalizable} support. + */ + public IgniteFsTaskArgsImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param ggfsName GGFS name. + * @param paths Paths. + * @param recRslvr Record resolver. + * @param skipNonExistentFiles Skip non existent files flag. + * @param maxRangeLen Maximum range length. + * @param usrArg User argument. + */ + public IgniteFsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr, + boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { + this.ggfsName = ggfsName; + this.paths = paths; + this.recRslvr = recRslvr; + this.skipNonExistentFiles = skipNonExistentFiles; + this.maxRangeLen = maxRangeLen; + this.usrArg = usrArg; + } + + /** {@inheritDoc} */ + @Override public String ggfsName() { + return ggfsName; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFsPath> paths() { + return paths; + } + + /** {@inheritDoc} */ + @Override public IgniteFsRecordResolver recordResolver() { + return recRslvr; + } + + /** {@inheritDoc} */ + @Override public boolean skipNonExistentFiles() { + return skipNonExistentFiles; + } + + /** {@inheritDoc} */ + @Override public long maxRangeLength() { + return maxRangeLen; + } + + /** {@inheritDoc} */ + @Override public T userArgument() { + return usrArg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsTaskArgsImpl.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ggfsName); + U.writeCollection(out, paths); + + out.writeObject(recRslvr); + out.writeBoolean(skipNonExistentFiles); + out.writeLong(maxRangeLen); + out.writeObject(usrArg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ggfsName = U.readString(in); + paths = U.readCollection(in); + + recRslvr = (IgniteFsRecordResolver)in.readObject(); + skipNonExistentFiles = in.readBoolean(); + maxRangeLen = in.readLong(); + usrArg = (T)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java index 9443ee7..d962109 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java @@ -148,7 +148,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { Long genLen = ggfs.info(FILE).length(); IgniteBiTuple<Long, Integer> taskRes = ggfs.execute(new Task(), - new GridGgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg); + new IgniteFsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg); assert F.eq(genLen, taskRes.getKey()); assert F.eq(TOTAL_WORDS, taskRes.getValue()); @@ -176,7 +176,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { Long genLen = ggfs.info(FILE).length(); assertNull(ggfsAsync.execute( - new Task(), new GridGgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg)); + new Task(), new IgniteFsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg)); IgniteFuture<IgniteBiTuple<Long, Integer>> fut = ggfsAsync.future(); @@ -231,7 +231,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { private static class Task extends IgniteFsTask<String, IgniteBiTuple<Long, Integer>> { /** {@inheritDoc} */ @Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, - GridGgfsTaskArgs<String> args) throws GridException { + IgniteFsTaskArgs<String> args) throws GridException { return new Job(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java index b88ccc2..0c20431 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java @@ -278,7 +278,7 @@ public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstrac byte[]... delims) throws Exception { write(data); - GridGgfsByteDelimiterRecordResolver rslvr = resolver(delims); + IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims); IgniteFsFileRange split; @@ -304,7 +304,7 @@ public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstrac throws Exception { write(data); - GridGgfsByteDelimiterRecordResolver rslvr = resolver(delims); + IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims); IgniteFsFileRange split; @@ -321,7 +321,7 @@ public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstrac * @param delims Delimiters. * @return Resolver. */ - private GridGgfsByteDelimiterRecordResolver resolver(byte[]... delims) { - return new GridGgfsByteDelimiterRecordResolver(delims); + private IgniteFsByteDelimiterRecordResolver resolver(byte[]... delims) { + return new IgniteFsByteDelimiterRecordResolver(delims); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java index 287e119..98b0abc 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java @@ -90,7 +90,7 @@ public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractR throws Exception { write(data); - GridGgfsFixedLengthRecordResolver rslvr = resolver(len); + IgniteFsFixedLengthRecordResolver rslvr = resolver(len); IgniteFsFileRange split; @@ -116,7 +116,7 @@ public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractR throws Exception { write(data); - GridGgfsFixedLengthRecordResolver rslvr = resolver(len); + IgniteFsFixedLengthRecordResolver rslvr = resolver(len); IgniteFsFileRange split; @@ -133,7 +133,7 @@ public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractR * @param len Length. * @return Resolver. */ - private GridGgfsFixedLengthRecordResolver resolver(int len) { - return new GridGgfsFixedLengthRecordResolver(len); + private IgniteFsFixedLengthRecordResolver resolver(int len) { + return new IgniteFsFixedLengthRecordResolver(len); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java index 49328bb..4d3aae2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java @@ -14,7 +14,7 @@ import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import org.gridgain.grid.util.typedef.*; -import static org.gridgain.grid.ggfs.mapreduce.records.GridGgfsNewLineRecordResolver.*; +import static org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver.*; /** * New line split resolver self test. @@ -74,7 +74,7 @@ public class GridGgfsNewLineDelimiterRecordResolverSelfTest extends GridGgfsAbst throws Exception { write(data); - GridGgfsNewLineRecordResolver rslvr = resolver(); + IgniteFsNewLineRecordResolver rslvr = resolver(); IgniteFsFileRange split; @@ -99,7 +99,7 @@ public class GridGgfsNewLineDelimiterRecordResolverSelfTest extends GridGgfsAbst throws Exception { write(data); - GridGgfsNewLineRecordResolver rslvr = resolver(); + IgniteFsNewLineRecordResolver rslvr = resolver(); IgniteFsFileRange split; @@ -115,7 +115,7 @@ public class GridGgfsNewLineDelimiterRecordResolverSelfTest extends GridGgfsAbst * * @return Resolver. */ - private GridGgfsNewLineRecordResolver resolver() { - return GridGgfsNewLineRecordResolver.NEW_LINE; + private IgniteFsNewLineRecordResolver resolver() { + return IgniteFsNewLineRecordResolver.NEW_LINE; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java index bf31792..51dbf5f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java @@ -80,7 +80,7 @@ public class GridGgfsStringDelimiterRecordResolverSelfTest extends GridGgfsAbstr String... delims) throws Exception { write(data); - GridGgfsByteDelimiterRecordResolver rslvr = resolver(delims); + IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims); IgniteFsFileRange split; @@ -106,7 +106,7 @@ public class GridGgfsStringDelimiterRecordResolverSelfTest extends GridGgfsAbstr throws Exception { write(data); - GridGgfsStringDelimiterRecordResolver rslvr = resolver(delims); + IgniteFsStringDelimiterRecordResolver rslvr = resolver(delims); IgniteFsFileRange split; @@ -123,7 +123,7 @@ public class GridGgfsStringDelimiterRecordResolverSelfTest extends GridGgfsAbstr * @param delims Delimiters. * @return Resolver. */ - private GridGgfsStringDelimiterRecordResolver resolver(String... delims) { - return new GridGgfsStringDelimiterRecordResolver(UTF8, delims); + private IgniteFsStringDelimiterRecordResolver resolver(String... delims) { + return new IgniteFsStringDelimiterRecordResolver(UTF8, delims); } }