http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java index 58ffd3e,0000000..cfaedda mode 100644,000000..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java @@@ -1,82 -1,0 +1,82 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.security.*; +import org.apache.ignite.plugin.security.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.spi.*; + +import java.util.*; + +/** + * Grid TCP discovery SPI start stop self test. + */ +@GridSpiTest(spi = TcpDiscoverySpi.class, group = "Discovery SPI") +public class TcpDiscoverySpiStartStopSelfTest extends GridSpiStartStopAbstractTest<TcpDiscoverySpi> { + /** + * @return IP finder. + */ + @GridSpiTestConfig + public TcpDiscoveryIpFinder getIpFinder() { + return new TcpDiscoveryVmIpFinder(true); + } + + /** + * @return Discovery data collector. + */ + @GridSpiTestConfig + public DiscoverySpiDataExchange getDataExchange() { + return new DiscoverySpiDataExchange() { - @Override public List<Object> collect(UUID nodeId) { ++ @Override public Map<Integer, Object> collect(UUID nodeId) { + return null; + } + - @Override public void onExchange(List<Object> data) { ++ @Override public void onExchange(Map<Integer, Object> data) { + // No-op. + } + }; + } + + /** + * Discovery SPI authenticator. + * + * @return Authenticator. + */ + @GridSpiTestConfig + public DiscoverySpiNodeAuthenticator getAuthenticator() { + return new DiscoverySpiNodeAuthenticator() { + @Override public GridSecurityContext authenticateNode(ClusterNode n, GridSecurityCredentials cred) { + GridSecuritySubjectAdapter subj = new GridSecuritySubjectAdapter( + GridSecuritySubjectType.REMOTE_NODE, n.id()); + + subj.permissions(new GridAllowAllPermissionSet()); + + return new GridSecurityContext(subj); + } + + @Override public boolean isGlobalNodeAuthentication() { + return false; + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java index 0000000,41fed96..c99f42a mode 000000,100644..100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java @@@ -1,0 -1,506 +1,506 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.hadoop; + + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.net.*; + import java.nio.file.*; + import java.text.*; + import java.util.*; + + import static org.apache.ignite.internal.GridProductImpl.*; + + /** + * Setup tool to configure Hadoop client. + */ + public class GridHadoopSetup { + /** */ + public static final String WINUTILS_EXE = "winutils.exe"; + + /** */ + private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("ignite-") && name.endsWith(".jar"); + } + }; + + /** + * The main method. + * @param ignore Params. + * @throws IOException If fails. + */ + public static void main(String[] ignore) throws IOException { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (_ / // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " for Apache Hadoop ", + " "); + + println("Version " + ACK_VER); + + configureHadoop(); + } + + /** - * This operation prepares the clean unpacked Hadoop distributive to work as client with GridGain-Hadoop. ++ * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. + * It performs these operations: + * <ul> + * <li>Check for setting of HADOOP_HOME environment variable.</li> + * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li> + * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li> + * <li>In Windows check new line character issues in CMD scripts.</li> - * <li>Scan Hadoop lib directory to detect GridGain JARs. If these don't exist tries to create ones.</li> ++ * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li> + * </ul> + */ + private static void configureHadoop() { - String gridgainHome = U.getGridGainHome(); ++ String gridgainHome = U.getIgniteHome(); + + println("IGNITE_HOME is set to '" + gridgainHome + "'."); + + checkGridGainHome(gridgainHome); + + String homeVar = "HADOOP_HOME"; + String hadoopHome = System.getenv(homeVar); + + if (F.isEmpty(hadoopHome)) { + homeVar = "HADOOP_PREFIX"; + hadoopHome = System.getenv(homeVar); + } + + if (F.isEmpty(hadoopHome)) + exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + + "valid Hadoop installation directory and run setup tool again.", null); + + hadoopHome = hadoopHome.replaceAll("\"", ""); + + println(homeVar + " is set to '" + hadoopHome + "'."); + + String hiveHome = System.getenv("HIVE_HOME"); + + if (!F.isEmpty(hiveHome)) { + hiveHome = hiveHome.replaceAll("\"", ""); + + println("HIVE_HOME is set to '" + hiveHome + "'."); + } + + File hadoopDir = new File(hadoopHome); + + if (!hadoopDir.exists()) + exit("Hadoop installation folder does not exist.", null); + + if (!hadoopDir.isDirectory()) + exit("HADOOP_HOME must point to a directory.", null); + + if (!hadoopDir.canRead()) + exit("Hadoop installation folder can not be read. Please check permissions.", null); + + File hadoopCommonDir; + + String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); + + if (F.isEmpty(hadoopCommonHome)) { + hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); + + println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); + } + else { + println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); + + hadoopCommonDir = new File(hadoopCommonHome); + } + + if (!hadoopCommonDir.canRead()) + exit("Failed to read Hadoop common dir in '" + hadoopCommonHome + "'.", null); + + File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); + + if (!hadoopCommonLibDir.canRead()) + exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); + + if (U.isWindows()) { + checkJavaPathSpaces(); + + File hadoopBinDir = new File(hadoopDir, "bin"); + + if (!hadoopBinDir.canRead()) + exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); + + File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); + + if (!winutilsFile.exists()) { + if (ask("File '" + WINUTILS_EXE + "' does not exist. " + + "It may be replaced by a stub. Create it?")) { + println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); + + boolean ok = false; + + try { + ok = winutilsFile.createNewFile(); + } + catch (IOException ignore) { + // No-op. + } + + if (!ok) + exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); + } + else + println("Ok. But Hadoop client probably will not work on Windows this way..."); + } + + processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); + } + + File gridgainLibs = new File(new File(gridgainHome), "libs"); + + if (!gridgainLibs.exists()) - exit("GridGain 'libs' folder is not found.", null); ++ exit("Ignite 'libs' folder is not found.", null); + + Collection<File> jarFiles = new ArrayList<>(); + + addJarsInFolder(jarFiles, gridgainLibs); + addJarsInFolder(jarFiles, new File(gridgainLibs, "gridgain-hadoop")); + + boolean jarsLinksCorrect = true; + + for (File file : jarFiles) { + File link = new File(hadoopCommonLibDir, file.getName()); + + jarsLinksCorrect &= isJarLinkCorrect(link, file); + + if (!jarsLinksCorrect) + break; + } + + if (!jarsLinksCorrect) { - if (ask("GridGain JAR files are not found in Hadoop 'lib' directory. " + ++ if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + + "Create appropriate symbolic links?")) { + File[] oldGridGainJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); + - if (oldGridGainJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other GridGain " + ++ if (oldGridGainJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + + "installation. They must be deleted to continue. Continue?")) { + for (File file : oldGridGainJarFiles) { + println("Deleting file '" + file.getAbsolutePath() + "'."); + + if (!file.delete()) + exit("Failed to delete file '" + file.getPath() + "'.", null); + } + } + + for (File file : jarFiles) { + File targetFile = new File(hadoopCommonLibDir, file.getName()); + + try { + println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); + + Files.createSymbolicLink(targetFile.toPath(), file.toPath()); + } + catch (IOException e) { + if (U.isWindows()) { + warn("Ability to create symbolic links is required!"); + warn("On Windows platform you have to grant permission 'Create symbolic links'"); + warn("to your user or run the Accelerator as Administrator."); + } + + exit("Creating symbolic link failed! Check permissions.", e); + } + } + } + else - println("Ok. But Hadoop client will not be able to talk to GridGain cluster without those JARs in classpath..."); ++ println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); + } + + File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); + + File gridgainDocs = new File(gridgainHome, "docs"); + + if (!gridgainDocs.canRead()) - exit("Failed to read GridGain 'docs' folder at '" + gridgainDocs.getAbsolutePath() + "'.", null); ++ exit("Failed to read Ignite 'docs' folder at '" + gridgainDocs.getAbsolutePath() + "'.", null); + + if (hadoopEtc.canWrite()) { // TODO Bigtop + if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + + "(existing files will be backed up)?")) { - replaceWithBackup(new File(gridgainDocs, "core-site.gridgain.xml"), new File(hadoopEtc, "core-site.xml")); ++ replaceWithBackup(new File(gridgainDocs, "core-site.ignite.xml"), new File(hadoopEtc, "core-site.xml")); + - replaceWithBackup(new File(gridgainDocs, "mapred-site.gridgain.xml"), new File(hadoopEtc, "mapred-site.xml")); ++ replaceWithBackup(new File(gridgainDocs, "mapred-site.ignite.xml"), new File(hadoopEtc, "mapred-site.xml")); + } + else - println("Ok. You can configure them later, the templates are available at GridGain's 'docs' directory..."); ++ println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); + } + + if (!F.isEmpty(hiveHome)) { + File hiveConfDir = new File(hiveHome + File.separator + "conf"); + + if (!hiveConfDir.canWrite()) + warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + - "configure 'hive-site.xml' manually. The template is available at GridGain's 'docs' directory."); ++ "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); + else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) - replaceWithBackup(new File(gridgainDocs, "hive-site.gridgain.xml"), new File(hiveConfDir, "hive-site.xml")); ++ replaceWithBackup(new File(gridgainDocs, "hive-site.ignite.xml"), new File(hiveConfDir, "hive-site.xml")); + else - println("Ok. You can configure it later, the template is available at GridGain's 'docs' directory..."); ++ println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); + } + + println("Apache Hadoop setup is complete."); + } + + /** + * @param jarFiles Jars. + * @param folder Folder. + */ + private static void addJarsInFolder(Collection<File> jarFiles, File folder) { + if (!folder.exists()) + exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); + + jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); + } + + /** + * Checks that JAVA_HOME does not contain space characters. + */ + private static void checkJavaPathSpaces() { + String javaHome = System.getProperty("java.home"); + + if (javaHome.contains(" ")) { + warn("Java installation path contains space characters!"); + warn("Hadoop client will not be able to start using '" + javaHome + "'."); + warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); + } + } + + /** - * Checks GridGain home. ++ * Checks Ignite home. + * - * @param ggHome GridGain home. ++ * @param ggHome Ignite home. + */ + private static void checkGridGainHome(String ggHome) { + URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); + + try { + Path jar = Paths.get(jarUrl.toURI()); + Path gg = Paths.get(ggHome); + + if (!jar.startsWith(gg)) - exit("GridGain JAR files are not under IGNITE_HOME.", null); ++ exit("Ignite JAR files are not under IGNITE_HOME.", null); + } + catch (Exception e) { + exit(e.getMessage(), e); + } + } + + /** + * Replaces target file with source file. + * + * @param from From. + * @param to To. + */ + private static void replaceWithBackup(File from, File to) { + if (!from.canRead()) + exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); + + println("Replacing file '" + to.getAbsolutePath() + "'."); + + try { + U.copy(from, renameToBak(to), true); + } + catch (IOException e) { + exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); + } + } + + /** + * Renames file for backup. + * + * @param file File. + * @return File. + */ + private static File renameToBak(File file) { + DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); + + if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) + exit("Failed to rename file '" + file.getPath() + "'.", null); + + return file; + } + + /** + * Checks if link is correct. + * + * @param link Symbolic link. + * @param correctTarget Correct link target. + * @return {@code true} If link target is correct. + */ + private static boolean isJarLinkCorrect(File link, File correctTarget) { + if (!Files.isSymbolicLink(link.toPath())) + return false; // It is a real file or it does not exist. + + Path target = null; + + try { + target = Files.readSymbolicLink(link.toPath()); + } + catch (IOException e) { + exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); + } + + return Files.exists(target) && target.toFile().equals(correctTarget); + } + + /** + * Writes the question end read the boolean answer from the console. + * + * @param question Question to write. + * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. + */ + private static boolean ask(String question) { + X.println(); + X.print(" < " + question + " (Y/N): "); + + String answer = null; + + if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) + answer = "Y"; + else { + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + try { + answer = br.readLine(); + } + catch (IOException e) { + exit("Failed to read answer: " + e.getMessage(), e); + } + } + + if (answer != null && "Y".equals(answer.toUpperCase().trim())) { + X.println(" > Yes."); + + return true; + } + else { + X.println(" > No."); + + return false; + } + } + + /** + * Exit with message. + * + * @param msg Exit message. + */ + private static void exit(String msg, Exception e) { + X.println(" "); + X.println(" # " + msg); + X.println(" # Setup failed, exiting... "); + + if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) + e.printStackTrace(); + + System.exit(1); + } + + /** + * Prints message. + * + * @param msg Message. + */ + private static void println(String msg) { + X.println(" > " + msg); + } + + /** + * Prints warning. + * + * @param msg Message. + */ + private static void warn(String msg) { + X.println(" ! " + msg); + } + + /** + * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the + * answer. If it's 'Y' then backups original files and corrects invalid new line characters. + * + * @param rootDir Root directory to process. + * @param dirs Directories inside of the root to process. + */ + private static void processCmdFiles(File rootDir, String... dirs) { + boolean answer = false; + + for (String dir : dirs) { + File subDir = new File(rootDir, dir); + + File[] cmdFiles = subDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".cmd"); + } + }); + + for (File file : cmdFiles) { + String content = null; + + try (Scanner scanner = new Scanner(file)) { + content = scanner.useDelimiter("\\Z").next(); + } + catch (FileNotFoundException e) { + exit("Failed to read file '" + file + "'.", e); + } + + boolean invalid = false; + + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { + invalid = true; + + break; + } + } + + if (invalid) { + answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); + + if (!answer) { + println("Ok. But Windows most probably will fail to execute them..."); + + return; + } + + println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); + + renameToBak(file); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) + writer.write("\r"); + + writer.write(content.charAt(i)); + } + } + catch (IOException e) { + exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java index 0000000,5913d84..33c8fc3 mode 000000,100644..100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java @@@ -1,0 -1,294 +1,294 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.hadoop; + + import com.google.common.collect.*; + import org.apache.hadoop.conf.*; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.*; + import org.apache.hadoop.io.*; + import org.apache.hadoop.mapreduce.*; + import org.apache.hadoop.mapreduce.lib.input.*; + import org.apache.hadoop.mapreduce.lib.output.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.util.*; + import java.util.Map.*; + + import static com.google.common.collect.Maps.*; + import static com.google.common.collect.MinMaxPriorityQueue.*; + import static java.util.Collections.*; + + /** + * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than + * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are + * output. + * + * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system + * path. + */ + public class GridHadoopPopularWordsTest { - /** GridGain home. */ - private static final String IGNITE_HOME = U.getGridGainHome(); ++ /** Ignite home. */ ++ private static final String IGNITE_HOME = U.getIgniteHome(); + + /** The path to the input directory. ALl files in that directory will be processed. */ + private static final Path BOOKS_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/gridgain/grid/hadoop/books"); + + /** The path to the output directory. THe result file will be written to this location. */ + private static final Path RESULT_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/gridgain/grid/hadoop/output"); + + /** Popular books source dir in DFS. */ + private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); + + /** Popular books source dir in DFS. */ + private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); + + /** Path to the distributed file system configuration. */ + private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; + + /** Top N words to select **/ + private static final int POPULAR_WORDS_CNT = 10; + + /** + * For each token in the input string the mapper emits a {word, 1} pair. + */ + private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { + /** Constant value. */ + private static final IntWritable ONE = new IntWritable(1); + + /** The word converted into the Text. */ + private Text word = new Text(); + + /** + * Emits a entry where the key is the word and the value is always 1. + * + * @param key the current position in the input file (not used here) + * @param val the text string + * @param ctx mapper context + * @throws IOException + * @throws InterruptedException + */ + @Override protected void map(LongWritable key, Text val, Context ctx) + throws IOException, InterruptedException { + // Get the mapped object. + final String line = val.toString(); + + // Splits the given string to words. + final String[] words = line.split("[^a-zA-Z0-9]"); + + for (final String w : words) { + // Only emit counts for longer words. + if (w.length() <= 3) + continue; + + word.set(w); + + // Write the word into the context with the initial count equals 1. + ctx.write(word, ONE); + } + } + } + + /** + * The reducer uses a priority queue to rank the words based on its number of occurrences. + */ + private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + private MinMaxPriorityQueue<Entry<Integer, String>> q; + + TopNWordsReducer() { + q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() { + @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) { + return o1.getKey().compareTo(o2.getKey()); + } + })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); + } + + /** + * This method doesn't emit anything, but just keeps track of the top N words. + * + * @param key The word. + * @param vals The words counts. + * @param ctx Reducer context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, + InterruptedException { + int sum = 0; + + for (IntWritable val : vals) + sum += val.get(); + + q.add(immutableEntry(sum, key.toString())); + } + + /** + * This method is called after all the word entries have been processed. It writes the accumulated + * statistics to the job output file. + * + * @param ctx The job context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + IntWritable i = new IntWritable(); + + Text txt = new Text(); + + // iterate in desc order + while (!q.isEmpty()) { + Entry<Integer, String> e = q.removeFirst(); + + i.set(e.getKey()); + + txt.set(e.getValue()); + + ctx.write(txt, i); + } + } + } + + /** + * Configures the Hadoop MapReduce job. + * + * @return Instance of the Hadoop MapRed job. + * @throws IOException If failed. + */ + private Job createConfigBasedHadoopJob() throws IOException { + Job jobCfg = new Job(); + + Configuration cfg = jobCfg.getConfiguration(); + + // Use explicit configuration of distributed file system, if provided. + if (DFS_CFG != null) - cfg.addResource(U.resolveGridGainUrl(DFS_CFG)); ++ cfg.addResource(U.resolveIgniteUrl(DFS_CFG)); + + jobCfg.setJobName("HadoopPopularWordExample"); + jobCfg.setJarByClass(GridHadoopPopularWordsTest.class); + jobCfg.setInputFormatClass(TextInputFormat.class); + jobCfg.setOutputKeyClass(Text.class); + jobCfg.setOutputValueClass(IntWritable.class); + jobCfg.setMapperClass(TokenizingMapper.class); + jobCfg.setReducerClass(TopNWordsReducer.class); + + FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); + FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); + + // Local job tracker allows the only task per wave, but text input format + // replaces it with the calculated value based on input split size option. + if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { + // Split job into tasks using 32MB split size. + FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); + FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); + } + + return jobCfg; + } + + /** + * Runs the Hadoop job. + * + * @return {@code True} if succeeded, {@code false} otherwise. + * @throws Exception If failed. + */ + private boolean runWordCountConfigBasedHadoopJob() throws Exception { + Job job = createConfigBasedHadoopJob(); + + // Distributed file system this job will work with. + FileSystem fs = FileSystem.get(job.getConfiguration()); + + X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); + + // Prepare input and output job directories. + prepareDirectories(fs); + + long time = System.currentTimeMillis(); + + // Run job. + boolean res = job.waitForCompletion(true); + + X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); + + // Move job results into local file system, so you can view calculated results. + publishResults(fs); + + return res; + } + + /** + * Prepare job's data: cleanup result directories that might have left over + * after previous runs, copy input files from the local file system into DFS. + * + * @param fs Distributed file system to use in job. + * @throws IOException If failed. + */ + private void prepareDirectories(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); + + fs.delete(RESULT_DFS_DIR, true); + + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); + + fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); + } + + /** + * Publish job execution results into local file system, so you can view them. + * + * @param fs Distributed file sytem used in job. + * @throws IOException If failed. + */ + private void publishResults(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.delete(RESULT_LOCAL_DIR, true); + + X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); + } + + /** + * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of + * occurrences of the word in the source files, the N most popular words are selected. + * + * @param args None. + */ + public static void main(String[] args) { + try { + new GridHadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); + } + catch (Exception e) { + X.println(">>> Failed to run word count example: " + e.getMessage()); + } + + System.exit(0); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8446c4b7/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ----------------------------------------------------------------------
