http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java new file mode 100644 index 0000000..19f78b5 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * Supplies some useful and repeatedly-used instances of {@link PathFilter}. + */ +public final class PathFilters { + + private static final PathFilter PART_FILE_INSTANCE = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith("part-") && !name.endsWith(".crc"); + } + }; + + /** + * Pathfilter to read the final clustering file. + */ + private static final PathFilter CLUSTER_FINAL = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith("clusters-") && name.endsWith("-final"); + } + }; + + private static final PathFilter LOGS_CRC_INSTANCE = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !(name.endsWith(".crc") || name.startsWith(".") || name.startsWith("_")); + } + }; + + private PathFilters() { + } + + /** + * @return {@link PathFilter} that accepts paths whose file name starts with "part-". Excludes + * ".crc" files. + */ + public static PathFilter partFilter() { + return PART_FILE_INSTANCE; + } + + /** + * @return {@link PathFilter} that accepts paths whose file name starts with "part-" and ends with "-final". + */ + public static PathFilter finalPartFilter() { + return CLUSTER_FINAL; + } + + /** + * @return {@link PathFilter} that rejects paths whose file name starts with "_" (e.g. Cloudera + * _SUCCESS files or Hadoop _logs), or "." (e.g. local hidden files), or ends with ".crc" + */ + public static PathFilter logsCRCFilter() { + return LOGS_CRC_INSTANCE; + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java new file mode 100644 index 0000000..7ea713e --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +/** + * Used by {@link SequenceFileDirIterable} and the like to select whether the input path specifies a + * directory to list, or a glob pattern. + */ +public enum PathType { + GLOB, + LIST, +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java new file mode 100644 index 0000000..ca4d6b8 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; + +/** + * <p>{@link Iterable} counterpart to {@link SequenceFileDirIterator}.</p> + */ +public final class SequenceFileDirIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> { + + private final Path path; + private final PathType pathType; + private final PathFilter filter; + private final Comparator<FileStatus> ordering; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + public SequenceFileDirIterable(Path path, PathType pathType, Configuration conf) { + this(path, pathType, null, conf); + } + + public SequenceFileDirIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) { + this(path, pathType, filter, null, false, conf); + } + + /** + * @param path file to iterate over + * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or + * glob pattern ({@link PathType#GLOB}) + * @param filter if not null, specifies sequence files to be ignored by the iteration + * @param ordering if not null, specifies the order in which to iterate over matching sequence files + * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new + * one for each read from the file + */ + public SequenceFileDirIterable(Path path, + PathType pathType, + PathFilter filter, + Comparator<FileStatus> ordering, + boolean reuseKeyValueInstances, + Configuration conf) { + this.path = path; + this.pathType = pathType; + this.filter = filter; + this.ordering = ordering; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator<Pair<K,V>> iterator() { + try { + return new SequenceFileDirIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java new file mode 100644 index 0000000..cf6a871 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.Pair; + +/** + * Like {@link SequenceFileIterator}, but iterates not just over one sequence file, but many. The input path + * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally + * restricted with a {@link PathFilter}. + */ +public final class SequenceFileDirIterator<K extends Writable,V extends Writable> + extends ForwardingIterator<Pair<K,V>> implements Closeable { + + private static final FileStatus[] NO_STATUSES = new FileStatus[0]; + + private Iterator<Pair<K,V>> delegate; + private final List<SequenceFileIterator<K,V>> iterators; + + /** + * Multifile sequence file iterator where files are specified explicitly by + * path parameters. + */ + public SequenceFileDirIterator(Path[] path, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + + iterators = Lists.newArrayList(); + // we assume all files should exist, otherwise we will bail out. + FileSystem fs = FileSystem.get(path[0].toUri(), conf); + FileStatus[] statuses = new FileStatus[path.length]; + for (int i = 0; i < statuses.length; i++) { + statuses[i] = fs.getFileStatus(path[i]); + } + init(statuses, reuseKeyValueInstances, conf); + } + + /** + * Constructor that uses either {@link FileSystem#listStatus(Path)} or + * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over + * (depending on pathType parameter). + */ + public SequenceFileDirIterator(Path path, + PathType pathType, + PathFilter filter, + Comparator<FileStatus> ordering, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + + FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf); + iterators = Lists.newArrayList(); + init(statuses, reuseKeyValueInstances, conf); + } + + private void init(FileStatus[] statuses, + final boolean reuseKeyValueInstances, + final Configuration conf) { + + /* + * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing + * was qualified. In this case, which is a corner case, we should assume an + * empty iterator, not an NPE. + */ + if (statuses == null) { + statuses = NO_STATUSES; + } + + Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses); + + Iterator<Iterator<Pair<K, V>>> fsIterators = + Iterators.transform(fileStatusIterator, + new Function<FileStatus, Iterator<Pair<K, V>>>() { + @Override + public Iterator<Pair<K, V>> apply(FileStatus from) { + try { + SequenceFileIterator<K, V> iterator = new SequenceFileIterator<>(from.getPath(), + reuseKeyValueInstances, conf); + iterators.add(iterator); + return iterator; + } catch (IOException ioe) { + throw new IllegalStateException(from.getPath().toString(), ioe); + } + } + }); + + Collections.reverse(iterators); // close later in reverse order + + delegate = Iterators.concat(fsIterators); + } + + @Override + protected Iterator<Pair<K,V>> delegate() { + return delegate; + } + + @Override + public void close() throws IOException { + IOUtils.close(iterators); + iterators.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java new file mode 100644 index 0000000..1cb4ebc --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; + +/** + * <p>{@link Iterable} counterpart to {@link SequenceFileDirValueIterator}.</p> + */ +public final class SequenceFileDirValueIterable<V extends Writable> implements Iterable<V> { + + private final Path path; + private final PathType pathType; + private final PathFilter filter; + private final Comparator<FileStatus> ordering; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + public SequenceFileDirValueIterable(Path path, PathType pathType, Configuration conf) { + this(path, pathType, null, conf); + } + + public SequenceFileDirValueIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) { + this(path, pathType, filter, null, false, conf); + } + + /** + * @param path file to iterate over + * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or + * glob pattern ({@link PathType#GLOB}) + * @param filter if not null, specifies sequence files to be ignored by the iteration + * @param ordering if not null, specifies the order in which to iterate over matching sequence files + * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new + * one for each read from the file + */ + public SequenceFileDirValueIterable(Path path, + PathType pathType, + PathFilter filter, + Comparator<FileStatus> ordering, + boolean reuseKeyValueInstances, + Configuration conf) { + this.path = path; + this.pathType = pathType; + this.filter = filter; + this.ordering = ordering; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator<V> iterator() { + try { + return new SequenceFileDirValueIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java new file mode 100644 index 0000000..908c8bb --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.IOUtils; + +/** + * Like {@link SequenceFileValueIterator}, but iterates not just over one + * sequence file, but many. The input path may be specified as a directory of + * files to read, or as a glob pattern. The set of files may be optionally + * restricted with a {@link PathFilter}. + */ +public final class SequenceFileDirValueIterator<V extends Writable> extends + ForwardingIterator<V> implements Closeable { + + private static final FileStatus[] NO_STATUSES = new FileStatus[0]; + + private Iterator<V> delegate; + private final List<SequenceFileValueIterator<V>> iterators; + + /** + * Constructor that uses either {@link FileSystem#listStatus(Path)} or + * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over + * (depending on pathType parameter). + */ + public SequenceFileDirValueIterator(Path path, + PathType pathType, + PathFilter filter, + Comparator<FileStatus> ordering, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + FileStatus[] statuses; + FileSystem fs = FileSystem.get(path.toUri(), conf); + if (filter == null) { + statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path); + } else { + statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter); + } + iterators = Lists.newArrayList(); + init(statuses, ordering, reuseKeyValueInstances, conf); + } + + /** + * Multifile sequence file iterator where files are specified explicitly by + * path parameters. + */ + public SequenceFileDirValueIterator(Path[] path, + Comparator<FileStatus> ordering, + boolean reuseKeyValueInstances, + Configuration conf) throws IOException { + + iterators = Lists.newArrayList(); + /* + * we assume all files should exist, otherwise we will bail out. + */ + FileSystem fs = FileSystem.get(path[0].toUri(), conf); + FileStatus[] statuses = new FileStatus[path.length]; + for (int i = 0; i < statuses.length; i++) { + statuses[i] = fs.getFileStatus(path[i]); + } + init(statuses, ordering, reuseKeyValueInstances, conf); + } + + private void init(FileStatus[] statuses, + Comparator<FileStatus> ordering, + final boolean reuseKeyValueInstances, + final Configuration conf) throws IOException { + + /* + * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing + * was qualified. In this case, which is a corner case, we should assume an + * empty iterator, not an NPE. + */ + if (statuses == null) { + statuses = NO_STATUSES; + } + + if (ordering != null) { + Arrays.sort(statuses, ordering); + } + Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses); + + try { + + Iterator<Iterator<V>> fsIterators = + Iterators.transform(fileStatusIterator, + new Function<FileStatus, Iterator<V>>() { + @Override + public Iterator<V> apply(FileStatus from) { + try { + SequenceFileValueIterator<V> iterator = new SequenceFileValueIterator<>(from.getPath(), + reuseKeyValueInstances, conf); + iterators.add(iterator); + return iterator; + } catch (IOException ioe) { + throw new IllegalStateException(from.getPath().toString(), ioe); + } + } + }); + + Collections.reverse(iterators); // close later in reverse order + + delegate = Iterators.concat(fsIterators); + + } finally { + /* + * prevent file handle leaks in case one of handles fails to open. If some + * of the files fail to open, constructor will fail and close() will never + * be called. Thus, those handles that did open in constructor, would leak + * out, unless we specifically handle it here. + */ + IOUtils.close(iterators); + } + } + + @Override + protected Iterator<V> delegate() { + return delegate; + } + + @Override + public void close() throws IOException { + IOUtils.close(iterators); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java new file mode 100644 index 0000000..f17c2a1 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; + +/** + * <p>{@link Iterable} counterpart to {@link SequenceFileIterator}.</p> + */ +public final class SequenceFileIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> { + + private final Path path; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + /** + * Like {@link #SequenceFileIterable(Path, boolean, Configuration)} but key and value instances are not reused + * by default. + * + * @param path file to iterate over + */ + public SequenceFileIterable(Path path, Configuration conf) { + this(path, false, conf); + } + + /** + * @param path file to iterate over + * @param reuseKeyValueInstances if true, reuses instances of the key and value object instead of creating a new + * one for each read from the file + */ + public SequenceFileIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) { + this.path = path; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator<Pair<K, V>> iterator() { + try { + return new SequenceFileIterator<>(path, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java new file mode 100644 index 0000000..bc5c549 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; + +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.mahout.common.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s keys and values, as a {@link Pair} + * containing key and value.</p> + */ +public final class SequenceFileIterator<K extends Writable,V extends Writable> + extends AbstractIterator<Pair<K,V>> implements Closeable { + + private final SequenceFile.Reader reader; + private final Configuration conf; + private final Class<K> keyClass; + private final Class<V> valueClass; + private final boolean noValue; + private K key; + private V value; + private final boolean reuseKeyValueInstances; + + private static final Logger log = LoggerFactory.getLogger(SequenceFileIterator.class); + + /** + * @throws IOException if path can't be read, or its key or value class can't be instantiated + */ + + public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException { + key = null; + value = null; + FileSystem fs = path.getFileSystem(conf); + path = path.makeQualified(fs); + reader = new SequenceFile.Reader(fs, path, conf); + this.conf = conf; + keyClass = (Class<K>) reader.getKeyClass(); + valueClass = (Class<V>) reader.getValueClass(); + noValue = NullWritable.class.equals(valueClass); + this.reuseKeyValueInstances = reuseKeyValueInstances; + } + + public Class<K> getKeyClass() { + return keyClass; + } + + public Class<V> getValueClass() { + return valueClass; + } + + @Override + public void close() throws IOException { + key = null; + value = null; + Closeables.close(reader, true); + + endOfData(); + } + + @Override + protected Pair<K,V> computeNext() { + if (!reuseKeyValueInstances || value == null) { + key = ReflectionUtils.newInstance(keyClass, conf); + if (!noValue) { + value = ReflectionUtils.newInstance(valueClass, conf); + } + } + try { + boolean available; + if (noValue) { + available = reader.next(key); + } else { + available = reader.next(key, value); + } + if (!available) { + close(); + return null; + } + return new Pair<>(key, value); + } catch (IOException ioe) { + try { + close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + throw new IllegalStateException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java new file mode 100644 index 0000000..d2fdf8d --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; + +/** + * <p>{@link Iterable} counterpart to {@link SequenceFileValueIterator}.</p> + */ +public final class SequenceFileValueIterable<V extends Writable> implements Iterable<V> { + + private final Path path; + private final boolean reuseKeyValueInstances; + private final Configuration conf; + + /** + * Like {@link #SequenceFileValueIterable(Path, boolean, Configuration)} but instances are not reused + * by default. + * + * @param path file to iterate over + */ + public SequenceFileValueIterable(Path path, Configuration conf) { + this(path, false, conf); + } + + /** + * @param path file to iterate over + * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new + * one for each read from the file + */ + public SequenceFileValueIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) { + this.path = path; + this.reuseKeyValueInstances = reuseKeyValueInstances; + this.conf = conf; + } + + @Override + public Iterator<V> iterator() { + try { + return new SequenceFileValueIterator<>(path, reuseKeyValueInstances, conf); + } catch (IOException ioe) { + throw new IllegalStateException(path.toString(), ioe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java new file mode 100644 index 0000000..cb2295c --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.iterator.sequencefile; + +import java.io.Closeable; +import java.io.IOException; + +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s values only.</p> + */ +public final class SequenceFileValueIterator<V extends Writable> extends AbstractIterator<V> implements Closeable { + + private final SequenceFile.Reader reader; + private final Configuration conf; + private final Class<V> valueClass; + private final Writable key; + private V value; + private final boolean reuseKeyValueInstances; + + private static final Logger log = LoggerFactory.getLogger(SequenceFileValueIterator.class); + + /** + * @throws IOException if path can't be read, or its key or value class can't be instantiated + */ + + public SequenceFileValueIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException { + value = null; + FileSystem fs = path.getFileSystem(conf); + path = fs.makeQualified(path); + reader = new SequenceFile.Reader(fs, path, conf); + this.conf = conf; + Class<? extends Writable> keyClass = (Class<? extends Writable>) reader.getKeyClass(); + key = ReflectionUtils.newInstance(keyClass, conf); + valueClass = (Class<V>) reader.getValueClass(); + this.reuseKeyValueInstances = reuseKeyValueInstances; + } + + public Class<V> getValueClass() { + return valueClass; + } + + @Override + public void close() throws IOException { + value = null; + Closeables.close(reader, true); + endOfData(); + } + + @Override + protected V computeNext() { + if (!reuseKeyValueInstances || value == null) { + value = ReflectionUtils.newInstance(valueClass, conf); + } + try { + boolean available = reader.next(key, value); + if (!available) { + close(); + return null; + } + return value; + } catch (IOException ioe) { + try { + close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + throw new IllegalStateException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java new file mode 100644 index 0000000..742d6cf --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java @@ -0,0 +1,61 @@ +package org.apache.mahout.common.lucene; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.util.Version; +import org.apache.mahout.common.ClassUtils; + +public final class AnalyzerUtils { + + private AnalyzerUtils() {} + + /** + * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}. Note, if you need to pass in + * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments + * @param analyzerClassName - Lucene Analyzer Name + * @return {@link Analyzer} + * @throws ClassNotFoundException - {@link ClassNotFoundException} + */ + public static Analyzer createAnalyzer(String analyzerClassName) throws ClassNotFoundException { + return createAnalyzer(analyzerClassName, Version.LUCENE_5_5_2); + } + + public static Analyzer createAnalyzer(String analyzerClassName, Version version) throws ClassNotFoundException { + Class<? extends Analyzer> analyzerClass = Class.forName(analyzerClassName).asSubclass(Analyzer.class); + return createAnalyzer(analyzerClass, version); + } + + /** + * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}. Note, if you need to pass in + * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments + * @param analyzerClass The Analyzer Class to instantiate + * @return {@link Analyzer} + */ + public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass) { + return createAnalyzer(analyzerClass, Version.LUCENE_5_5_2); + } + + public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass, Version version) { + try { + return ClassUtils.instantiateAs(analyzerClass, Analyzer.class, + new Class<?>[] { Version.class }, new Object[] { version }); + } catch (IllegalStateException e) { + return ClassUtils.instantiateAs(analyzerClass, Analyzer.class); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java new file mode 100644 index 0000000..5facad8 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.lucene; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; + +import java.util.Iterator; + +/** Used to emit tokens from an input string array in the style of TokenStream */ +public final class IteratorTokenStream extends TokenStream { + private final CharTermAttribute termAtt; + private final Iterator<String> iterator; + + public IteratorTokenStream(Iterator<String> iterator) { + this.iterator = iterator; + this.termAtt = addAttribute(CharTermAttribute.class); + } + + @Override + public boolean incrementToken() { + if (iterator.hasNext()) { + clearAttributes(); + termAtt.append(iterator.next()); + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java new file mode 100644 index 0000000..af60d8b --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.lucene; + +import com.google.common.collect.AbstractIterator; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; + +import java.io.IOException; + +/** + * Provide an Iterator for the tokens in a TokenStream. + * + * Note, it is the responsibility of the instantiating class to properly consume the + * {@link org.apache.lucene.analysis.TokenStream}. See the Lucene {@link org.apache.lucene.analysis.TokenStream} + * documentation for more information. + */ +//TODO: consider using the char/byte arrays instead of strings, esp. when we upgrade to Lucene 4.0 +public final class TokenStreamIterator extends AbstractIterator<String> { + + private final TokenStream tokenStream; + + public TokenStreamIterator(TokenStream tokenStream) { + this.tokenStream = tokenStream; + } + + @Override + protected String computeNext() { + try { + if (tokenStream.incrementToken()) { + return tokenStream.getAttribute(CharTermAttribute.class).toString(); + } else { + tokenStream.end(); + tokenStream.close(); + return endOfData(); + } + } catch (IOException e) { + throw new IllegalStateException("IO error while tokenizing", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java new file mode 100644 index 0000000..8e0385d --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.mapreduce; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; + +public class MergeVectorsCombiner + extends Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> { + + @Override + public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx) + throws IOException, InterruptedException { + ctx.write(key, VectorWritable.merge(vectors.iterator())); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java new file mode 100644 index 0000000..b8d5dea --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.mapreduce; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; + +public class MergeVectorsReducer extends + Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> { + + private final VectorWritable result = new VectorWritable(); + + @Override + public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx) + throws IOException, InterruptedException { + Vector merged = VectorWritable.merge(vectors.iterator()).get(); + result.set(new SequentialAccessSparseVector(merged)); + ctx.write(key, result); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java new file mode 100644 index 0000000..c6c3f05 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.mapreduce; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; + +public class TransposeMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + public static final String NEW_NUM_COLS_PARAM = TransposeMapper.class.getName() + ".newNumCols"; + + private int newNumCols; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + newNumCols = ctx.getConfiguration().getInt(NEW_NUM_COLS_PARAM, Integer.MAX_VALUE); + } + + @Override + protected void map(IntWritable r, VectorWritable v, Context ctx) throws IOException, InterruptedException { + int row = r.get(); + for (Vector.Element e : v.get().nonZeroes()) { + RandomAccessSparseVector tmp = new RandomAccessSparseVector(newNumCols, 1); + tmp.setQuick(row, e.get()); + r.set(e.index()); + ctx.write(r, new VectorWritable(tmp)); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java new file mode 100644 index 0000000..1d93386 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.mapreduce; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors; + +import java.io.IOException; + +public class VectorSumCombiner + extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> { + + private final VectorWritable result = new VectorWritable(); + + @Override + protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx) + throws IOException, InterruptedException { + result.set(Vectors.sum(values.iterator())); + ctx.write(key, result); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java new file mode 100644 index 0000000..97d3805 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.mapreduce; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors; + +import java.io.IOException; + +public class VectorSumReducer + extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> { + + @Override + protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx) + throws IOException, InterruptedException { + ctx.write(key, new VectorWritable(Vectors.sum(values.iterator()))); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java new file mode 100644 index 0000000..7adadc1 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.nlp; + +import com.google.common.base.CharMatcher; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class NGrams { + + private static final Splitter SPACE_TAB = Splitter.on(CharMatcher.anyOf(" \t")); + + private final String line; + private final int gramSize; + + public NGrams(String line, int gramSize) { + this.line = line; + this.gramSize = gramSize; + } + + public Map<String,List<String>> generateNGrams() { + Map<String,List<String>> returnDocument = Maps.newHashMap(); + + Iterator<String> tokenizer = SPACE_TAB.split(line).iterator(); + List<String> tokens = Lists.newArrayList(); + String labelName = tokenizer.next(); + List<String> previousN1Grams = Lists.newArrayList(); + while (tokenizer.hasNext()) { + + String nextToken = tokenizer.next(); + if (previousN1Grams.size() == gramSize) { + previousN1Grams.remove(0); + } + + previousN1Grams.add(nextToken); + + StringBuilder gramBuilder = new StringBuilder(); + + for (String gram : previousN1Grams) { + gramBuilder.append(gram); + String token = gramBuilder.toString(); + tokens.add(token); + gramBuilder.append(' '); + } + } + returnDocument.put(labelName, tokens); + return returnDocument; + } + + public List<String> generateNGramsWithoutLabel() { + + List<String> tokens = Lists.newArrayList(); + List<String> previousN1Grams = Lists.newArrayList(); + for (String nextToken : SPACE_TAB.split(line)) { + + if (previousN1Grams.size() == gramSize) { + previousN1Grams.remove(0); + } + + previousN1Grams.add(nextToken); + + StringBuilder gramBuilder = new StringBuilder(); + + for (String gram : previousN1Grams) { + gramBuilder.append(gram); + String token = gramBuilder.toString(); + tokens.add(token); + gramBuilder.append(' '); + } + } + + return tokens; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java new file mode 100644 index 0000000..f0a7aa8 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.parameters; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; + +public abstract class AbstractParameter<T> implements Parameter<T> { + + private T value; + private final String prefix; + private final String name; + private final String description; + private final Class<T> type; + private final String defaultValue; + + protected AbstractParameter(Class<T> type, + String prefix, + String name, + Configuration jobConf, + T defaultValue, + String description) { + this.type = type; + this.name = name; + this.description = description; + + this.value = defaultValue; + this.defaultValue = getStringValue(); + + this.prefix = prefix; + String jobConfValue = jobConf.get(prefix + name); + if (jobConfValue != null) { + setStringValue(jobConfValue); + } + + } + + @Override + public void configure(Configuration jobConf) { + // nothing to do + } + + @Override + public void createParameters(String prefix, Configuration jobConf) { } + + @Override + public String getStringValue() { + if (value == null) { + return null; + } + return value.toString(); + } + + @Override + public Collection<Parameter<?>> getParameters() { + return Collections.emptyList(); + } + + @Override + public String prefix() { + return prefix; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + + @Override + public Class<T> type() { + return type; + } + + @Override + public String defaultValue() { + return defaultValue; + } + + @Override + public T get() { + return value; + } + + @Override + public void set(T value) { + this.value = value; + } + + @Override + public String toString() { + if (value != null) { + return value.toString(); + } else { + return super.toString(); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java new file mode 100644 index 0000000..1d1c0bb --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.parameters; + +import org.apache.hadoop.conf.Configuration; + +public class ClassParameter extends AbstractParameter<Class> { + + public ClassParameter(String prefix, String name, Configuration jobConf, Class<?> defaultValue, String description) { + super(Class.class, prefix, name, jobConf, defaultValue, description); + } + + @Override + public void setStringValue(String stringValue) { + try { + set(Class.forName(stringValue)); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + @Override + public String getStringValue() { + if (get() == null) { + return null; + } + return get().getName(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java new file mode 100644 index 0000000..cb3efcf --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.parameters; + +import org.apache.hadoop.conf.Configuration; + +public class DoubleParameter extends AbstractParameter<Double> { + + public DoubleParameter(String prefix, String name, Configuration conf, double defaultValue, String description) { + super(Double.class, prefix, name, conf, defaultValue, description); + } + + @Override + public void setStringValue(String stringValue) { + set(Double.valueOf(stringValue)); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java new file mode 100644 index 0000000..292fa27 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.parameters; + +/** + * An accessor to a parameters in the job. + * + * This is a composite entity that can it self contain more parameters. Say the parameters describes what + * DistanceMeasure class to use, once set this parameters would also produce the parameters available in that + * DistanceMeasure implementation. + */ +public interface Parameter<T> extends Parametered { + /** @return job configuration setting key prefix, e.g. 'org.apache.mahout.util.WeightedDistanceMeasure.' */ + String prefix(); + + /** @return configuration parameters name, e.g. 'weightsFile' */ + String name(); + + /** @return human readable description of parameters */ + String description(); + + /** @return value class type */ + Class<T> type(); + + /** + * @param stringValue + * value string representation + */ + void setStringValue(String stringValue); + + /** + * @return value string representation of current value + */ + String getStringValue(); + + /** + * @param value + * new parameters value + */ + void set(T value); + + /** @return current parameters value */ + T get(); + + /** @return value used if not set by consumer */ + String defaultValue(); +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java new file mode 100644 index 0000000..96c9457 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.parameters; + +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Meta information and accessors for configuring a job. */ +public interface Parametered { + + Logger log = LoggerFactory.getLogger(Parametered.class); + + Collection<Parameter<?>> getParameters(); + + /** + * EXPERT: consumers should never have to call this method. It would be friendly visible to + * {@link ParameteredGeneralizations} if java supported it. Calling this method should create a new list of + * parameters and is called + * + * @param prefix + * ends with a dot if not empty. + * @param jobConf + * configuration used for retrieving values + * @see ParameteredGeneralizations#configureParameters(String,Parametered,Configuration) + * invoking method + * @see ParameteredGeneralizations#configureParametersRecursively(Parametered,String,Configuration) + * invoking method + */ + void createParameters(String prefix, Configuration jobConf); + + void configure(Configuration config); + + /** "multiple inheritance" */ + final class ParameteredGeneralizations { + private ParameteredGeneralizations() { } + + public static void configureParameters(Parametered parametered, Configuration jobConf) { + configureParameters(parametered.getClass().getSimpleName() + '.', + parametered, jobConf); + + } + + /** + * Calls + * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)} + * on parameter parmetered, and then recur down its composite tree to invoke + * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)} + * and {@link Parametered#configure(org.apache.hadoop.conf.Configuration)} on + * each composite part. + * + * @param prefix + * ends with a dot if not empty. + * @param parametered + * instance to be configured + * @param jobConf + * configuration used for retrieving values + */ + public static void configureParameters(String prefix, Parametered parametered, Configuration jobConf) { + parametered.createParameters(prefix, jobConf); + configureParametersRecursively(parametered, prefix, jobConf); + } + + private static void configureParametersRecursively(Parametered parametered, String prefix, Configuration jobConf) { + for (Parameter<?> parameter : parametered.getParameters()) { + if (log.isDebugEnabled()) { + log.debug("Configuring {}{}", prefix, parameter.name()); + } + String name = prefix + parameter.name() + '.'; + parameter.createParameters(name, jobConf); + parameter.configure(jobConf); + if (!parameter.getParameters().isEmpty()) { + configureParametersRecursively(parameter, name, jobConf); + } + } + } + + public static String help(Parametered parametered) { + return new Help(parametered).toString(); + } + + public static String conf(Parametered parametered) { + return new Conf(parametered).toString(); + } + + private static final class Help { + static final int NAME_DESC_DISTANCE = 8; + + private final StringBuilder sb; + private int longestName; + private int numChars = 100; // a few extra just to be sure + + private Help(Parametered parametered) { + recurseCount(parametered); + numChars += (longestName + NAME_DESC_DISTANCE) * parametered.getParameters().size(); + sb = new StringBuilder(numChars); + recurseWrite(parametered); + } + + @Override + public String toString() { + return sb.toString(); + } + + private void recurseCount(Parametered parametered) { + for (Parameter<?> parameter : parametered.getParameters()) { + int parameterNameLength = parameter.name().length(); + if (parameterNameLength > longestName) { + longestName = parameterNameLength; + } + recurseCount(parameter); + numChars += parameter.description().length(); + } + } + + private void recurseWrite(Parametered parametered) { + for (Parameter<?> parameter : parametered.getParameters()) { + sb.append(parameter.prefix()); + sb.append(parameter.name()); + int max = longestName - parameter.name().length() - parameter.prefix().length() + + NAME_DESC_DISTANCE; + for (int i = 0; i < max; i++) { + sb.append(' '); + } + sb.append(parameter.description()); + if (parameter.defaultValue() != null) { + sb.append(" (default value '"); + sb.append(parameter.defaultValue()); + sb.append("')"); + } + sb.append('\n'); + recurseWrite(parameter); + } + } + } + + private static final class Conf { + private final StringBuilder sb; + private int longestName; + private int numChars = 100; // a few extra just to be sure + + private Conf(Parametered parametered) { + recurseCount(parametered); + sb = new StringBuilder(numChars); + recurseWrite(parametered); + } + + @Override + public String toString() { + return sb.toString(); + } + + private void recurseCount(Parametered parametered) { + for (Parameter<?> parameter : parametered.getParameters()) { + int parameterNameLength = parameter.prefix().length() + parameter.name().length(); + if (parameterNameLength > longestName) { + longestName = parameterNameLength; + } + + numChars += parameterNameLength; + numChars += 5; // # $0\n$1 = $2\n\n + numChars += parameter.description().length(); + if (parameter.getStringValue() != null) { + numChars += parameter.getStringValue().length(); + } + + recurseCount(parameter); + } + } + + private void recurseWrite(Parametered parametered) { + for (Parameter<?> parameter : parametered.getParameters()) { + sb.append("# "); + sb.append(parameter.description()); + sb.append('\n'); + sb.append(parameter.prefix()); + sb.append(parameter.name()); + sb.append(" = "); + if (parameter.getStringValue() != null) { + sb.append(parameter.getStringValue()); + } + sb.append('\n'); + sb.append('\n'); + recurseWrite(parameter); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java new file mode 100644 index 0000000..a617fe3 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common.parameters; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public class PathParameter extends AbstractParameter<Path> { + + public PathParameter(String prefix, String name, Configuration jobConf, Path defaultValue, String description) { + super(Path.class, prefix, name, jobConf, defaultValue, description); + } + + @Override + public void setStringValue(String stringValue) { + set(new Path(stringValue)); + } +}
