This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3054813a0 [CELEBORN-856] Add mapreduce integration test
3054813a0 is described below

commit 3054813a0f1060359e0bd12f05b19eaef344c7a1
Author: sychen <[email protected]>
AuthorDate: Wed Nov 22 14:36:29 2023 +0800

    [CELEBORN-856] Add mapreduce integration test
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2073 from cxzl25/CELEBORN-856.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .github/workflows/maven.yml                        |  31 ++++
 .github/workflows/sbt.yml                          |  27 ++++
 pom.xml                                            |   1 +
 project/CelebornBuild.scala                        |  53 +++++-
 tests/mr-it/pom.xml                                | 177 +++++++++++++++++++++
 .../src/test/resources/container-log4j.properties  |  25 +++
 tests/mr-it/src/test/resources/log4j2-test.xml     |  41 +++++
 .../apache/celeborn/tests/mr/WordCountTest.scala   | 149 +++++++++++++++++
 8 files changed, 502 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 43e602d63..52f7bb7a2 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -177,3 +177,34 @@ jobs:
           name: flink-${{ matrix.flink }}-unit-test-log
           path: |
             **/target/unit-tests.log
+
+  mr:
+    runs-on: ubuntu-22.04
+    strategy:
+      fail-fast: false
+      matrix:
+        java:
+          - 8
+          - 11
+    steps:
+      - uses: actions/checkout@v2
+      - name: Setup JDK ${{ matrix.java }}
+        uses: actions/setup-java@v2
+        with:
+          distribution: zulu
+          java-version: ${{ matrix.java }}
+          cache: maven
+          check-latest: false
+      - name: Test with Maven
+        run: |
+          PROFILES="-Pgoogle-mirror,mr"
+          TEST_MODULES="client-mr/mr,client-mr/mr-shaded,tests/mr-it"
+          build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
+          build/mvn $PROFILES -pl $TEST_MODULES test
+      - name: Upload test log
+        if: failure()
+        uses: actions/upload-artifact@v3
+        with:
+          name: mr-unit-test-log
+          path: |
+            **/target/unit-tests.log
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index 740e6bdf7..71a58d36f 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -222,3 +222,30 @@ jobs:
             name: flink-${{ matrix.flink }}-unit-test-log
             path: |
                 **/target/test-reports/**
+
+  mr:
+    runs-on: ubuntu-22.04
+    strategy:
+      fail-fast: false
+      matrix:
+        java:
+          - 8
+          - 11
+    steps:
+      - uses: actions/checkout@v2
+      - name: Setup JDK ${{ matrix.java }}
+        uses: actions/setup-java@v2
+        with:
+          distribution: zulu
+          java-version: ${{ matrix.java }}
+          check-latest: false
+      - name: Test with SBT
+        run: |
+          build/sbt -Pmr "clean; celeborn-mr-group/test"
+      - name: Upload test log
+        if: failure()
+        uses: actions/upload-artifact@v3
+        with:
+          name: mr-unit-test-log
+          path: |
+            **/target/test-reports/**
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1927ebffe..34dfaa680 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1312,6 +1312,7 @@
       <modules>
         <module>client-mr/mr</module>
         <module>client-mr/mr-shaded</module>
+        <module>tests/mr-it</module>
       </modules>
     </profile>
 
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 3d7b748cf..cf6b43e19 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -34,7 +34,7 @@ object Dependencies {
 
   val zstdJniVersion = 
sparkClientProjects.map(_.zstdJniVersion).getOrElse("1.5.2-1")
   val lz4JavaVersion = 
sparkClientProjects.map(_.lz4JavaVersion).getOrElse("1.8.0")
-  
+
   // Dependent library versions
   val commonsCompressVersion = "1.4.1"
   val commonsCryptoVersion = "1.0.0"
@@ -933,6 +933,24 @@ object MRClientProjects {
       )
   }
 
+  def mrIt: Project = {
+    Project("celeborn-mr-it", file("tests/mr-it"))
+      // ref: 
https://www.scala-sbt.org/1.x/docs/Multi-Project.html#Classpath+dependencies
+      .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+      .dependsOn(CelebornClient.client % "test->test;compile->compile")
+      .dependsOn(CelebornMaster.master % "test->test;compile->compile")
+      .dependsOn(CelebornWorker.worker % "test->test;compile->compile")
+      .dependsOn(mrClient % "test->test;compile->compile")
+      .settings(
+        commonSettings,
+        copyDepsSettings,
+        libraryDependencies ++= Seq(
+          "org.apache.hadoop" % "hadoop-client-minicluster" % 
Dependencies.hadoopVersion % "test",
+          "org.apache.hadoop" % "hadoop-mapreduce-examples" % 
Dependencies.hadoopVersion % "test"
+        ) ++ commonUnitTestDependencies
+      )
+  }
+
   def mrClientShade: Project = {
     Project("celeborn-client-mr-shaded", file("client-mr/mr-shaded"))
       .dependsOn(mrClient)
@@ -996,6 +1014,37 @@ object MRClientProjects {
   }
 
   def modules: Seq[Project] = {
-    Seq(mrClient, mrClientShade)
+    Seq(mrClient, mrIt, mrGroup, mrClientShade)
+  }
+
+  // for test only, don't use this group for any other projects
+  lazy val mrGroup = (project withId "celeborn-mr-group").aggregate(mrClient, 
mrIt)
+
+  val copyDeps = TaskKey[Unit]("copyDeps", "Copies needed dependencies to the 
build directory.")
+  val destPath = (Compile / crossTarget) {
+    _ / "mapreduce_lib"
   }
+
+  lazy val copyDepsSettings = Seq(
+    copyDeps := {
+      val dest = destPath.value
+      if (!dest.isDirectory() && !dest.mkdirs()) {
+        throw new java.io.IOException("Failed to create jars directory.")
+      }
+
+      (Compile / dependencyClasspath).value.map(_.data)
+        .filter { jar => jar.isFile() }
+        .foreach { jar =>
+          val destJar = new File(dest, jar.getName())
+          if (destJar.isFile()) {
+            destJar.delete()
+          }
+          Files.copy(jar.toPath(), destJar.toPath())
+        }
+    },
+    (Test / compile) := {
+      copyDeps.value
+      (Test / compile).value
+    }
+  )
 }
diff --git a/tests/mr-it/pom.xml b/tests/mr-it/pom.xml
new file mode 100644
index 000000000..7c3f1c1e4
--- /dev/null
+++ b/tests/mr-it/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-mr-it_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn MapReduce Integration Test</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-master_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      
<artifactId>celeborn-client-mr-shaded_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-minicluster</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.xerial.snappy</groupId>
+          <artifactId>snappy-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-examples</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- The compile scope is to generate mapreduce_lib -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              
<outputDirectory>${project.build.directory}/mapreduce_lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/tests/mr-it/src/test/resources/container-log4j.properties 
b/tests/mr-it/src/test/resources/container-log4j.properties
new file mode 100644
index 000000000..c37e1d32f
--- /dev/null
+++ b/tests/mr-it/src/test/resources/container-log4j.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# STDOUT Appender
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n
+
+log4j.rootLogger=INFO, stdout
\ No newline at end of file
diff --git a/tests/mr-it/src/test/resources/log4j2-test.xml 
b/tests/mr-it/src/test/resources/log4j2-test.xml
new file mode 100644
index 000000000..9adcdccfd
--- /dev/null
+++ b/tests/mr-it/src/test/resources/log4j2-test.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="INFO">
+    <Appenders>
+        <Console name="stdout" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: 
%m%n%ex"/>
+            <Filters>
+                <ThresholdFilter level="ERROR"/>
+            </Filters>
+        </Console>
+        <File name="file" fileName="target/unit-tests.log">
+            <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: 
%m%n%ex"/>
+        </File>
+    </Appenders>
+    <Loggers>
+        <Root level="INFO">
+            <AppenderRef ref="stdout"/>
+            <AppenderRef ref="file"/>
+        </Root>
+        <Logger name="org.sparkproject.jetty" level="WARN" additivity="false">
+            <AppenderRef ref="stdout"/>
+            <AppenderRef ref="file"/>
+        </Logger>
+    </Loggers>
+</Configuration>
diff --git 
a/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala 
b/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
new file mode 100644
index 000000000..0f1bb91aa
--- /dev/null
+++ 
b/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.mr
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.examples.WordCount
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{IntWritable, Text}
+import org.apache.hadoop.mapred.ShuffleHandler
+import org.apache.hadoop.mapreduce.{Job, MRJobConfig}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.service.Service
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
+
+class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
+  with BeforeAndAfterAll {
+  var workers: collection.Set[Worker] = null
+
+  var yarnCluster: MiniYARNCluster = null
+  var hadoopConf: Configuration = null
+
+  override def beforeAll(): Unit = {
+    logInfo("test initialized , setup celeborn mini cluster")
+    val masterConf = Map(
+      "celeborn.master.host" -> "localhost",
+      "celeborn.master.port" -> "9097")
+    val workerConf = Map("celeborn.master.endpoints" -> "localhost:9097")
+    workers = setUpMiniCluster(masterConf, workerConf)._2
+
+    hadoopConf = new Configuration()
+    hadoopConf.set("yarn.scheduler.capacity.root.queues", 
"default,other_queue")
+
+    hadoopConf.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
+    hadoopConf.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 
100)
+    
hadoopConf.setInt("yarn.scheduler.capacity.root.other_queue.maximum-capacity", 
100)
+
+    hadoopConf.setStrings(
+      YarnConfiguration.NM_AUX_SERVICES,
+      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)
+    hadoopConf.setClass(
+      String.format(
+        YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID),
+      classOf[ShuffleHandler],
+      classOf[Service])
+
+    yarnCluster = new MiniYARNCluster("MiniClusterWordCount", 1, 1, 1)
+    yarnCluster.init(hadoopConf)
+    yarnCluster.start()
+  }
+
+  override def afterAll(): Unit = {
+    logInfo("all test complete , stop celeborn mini cluster")
+    shutdownMiniCluster()
+    if (yarnCluster != null) {
+      yarnCluster.stop()
+    }
+  }
+
+  test("celeborn mr integration test - word count") {
+    val input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input")
+    Files.write(
+      Paths.get(input.getPath, "v1.txt"),
+      "hello world celeborn".getBytes(StandardCharsets.UTF_8))
+    Files.write(
+      Paths.get(input.getPath, "v2.txt"),
+      "hello world mapreduce".getBytes(StandardCharsets.UTF_8))
+
+    val output = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"output")
+    val mrOutputPath = new Path(output.getPath + File.separator + "mr_output")
+
+    val conf = new Configuration(yarnCluster.getConfig)
+    // YARN config
+    conf.set("yarn.app.mapreduce.am.job.recovery.enable", "false")
+    conf.set(
+      "yarn.app.mapreduce.am.command-opts",
+      "org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn")
+
+    // MapReduce config
+    conf.set("mapreduce.framework.name", "yarn")
+    conf.set("mapreduce.job.user.classpath.first", "true")
+
+    conf.set("mapreduce.job.reduce.slowstart.completedmaps", "1")
+    conf.set("mapreduce.celeborn.master.endpoints", "localhost:9097")
+    conf.set(
+      MRJobConfig.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+      "org.apache.hadoop.mapred.CelebornMapOutputCollector")
+    conf.set(
+      "mapreduce.job.reduce.shuffle.consumer.plugin.class",
+      "org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer")
+
+    val job = Job.getInstance(conf, "word count")
+    job.setJarByClass(classOf[WordCount])
+    job.setMapperClass(classOf[WordCount.TokenizerMapper])
+    job.setCombinerClass(classOf[WordCount.IntSumReducer])
+    job.setReducerClass(classOf[WordCount.IntSumReducer])
+    job.setOutputKeyClass(classOf[Text])
+    job.setOutputValueClass(classOf[IntWritable])
+    FileInputFormat.addInputPath(job, new Path(input.getPath))
+    FileOutputFormat.setOutputPath(job, mrOutputPath)
+
+    val mapreduceLibPath = 
(Utils.getCodeSourceLocation(getClass).split("/").dropRight(1) ++ Array(
+      "mapreduce_lib")).mkString("/")
+    val excludeJarList =
+      Seq("hadoop-client-api", "hadoop-client-runtime", 
"hadoop-client-minicluster")
+    Files.list(Paths.get(mapreduceLibPath)).iterator().asScala.foreach(path => 
{
+      if (!excludeJarList.exists(path.toFile.getPath.contains(_))) {
+        job.addFileToClassPath(new Path(path.toString))
+      }
+    })
+
+    val exitCode = job.waitForCompletion(true)
+    assert(exitCode, "Returned error code.")
+
+    val outputFilePath = Paths.get(mrOutputPath.toString, "part-r-00000")
+    assert(outputFilePath.toFile.exists())
+    assert(Files.readAllLines(outputFilePath).contains("celeborn\t1"))
+  }
+}

Reply via email to