cnauroth commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1153857994


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestCommitterSupport.java:
##########
@@ -224,6 +231,23 @@ public static ManifestSuccessData createManifestOutcome(
     return outcome;
   }
 
+  /**
+   * Add heap information to IOStatisticSetters gauges, with a stage in front 
of every key.
+   * @param ioStatisticsSetters map to update
+   * @param stage stage
+   */
+  public static void addHeapInformation(IOStatisticsSetters 
ioStatisticsSetters,
+      String stage) {
+    // force a gc. bit of bad form but it makes for better numbers
+    System.gc();

Review Comment:
   This triggered a Spotbugs warning. Do think the forced GC should go behind a 
config flag, default off, and turned on in the tests?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java:
##########
@@ -222,6 +223,33 @@ public synchronized Map<String, MeanStatistic> 
meanStatistics() {
     return meanStatistics;
   }
 
+  @Override
+  public synchronized void setCounter(final String key, final long value) {
+    counters().put(key, value);
+  }
+
+  @Override
+  public synchronized void setGauge(final String key, final long value) {
+    gauges().put(key, value);
+
+  }
+
+  @Override
+  public synchronized void setMaximum(final String key, final long value) {
+    maximums().put(key, value);
+
+  }
+
+  @Override
+  public synchronized void setMinimum(final String key, final long value) {
+    minimums().put(key, value);
+  }
+
+  @Override
+  public void setMeanStatistic(final String key, final MeanStatistic value) {
+

Review Comment:
   `meanStatistics().put(key, value);`?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java:
##########
@@ -63,6 +81,10 @@ public void setup() throws Exception {
         .isGreaterThan(0);
   }
 
+  public long heapSize() {
+       return Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();

Review Comment:
   Nitpick: some indentation issues here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java:
##########
@@ -134,8 +169,26 @@ public void testSaveThenLoadManyManifests() throws 
Throwable {
 
     // and skipping the rename stage (which is going to fail),
     // go straight to cleanup
-    new CleanupJobStage(getJobStageConfig()).apply(
+    new CleanupJobStage(stageConfig).apply(
         new CleanupJobStage.Arguments("", true, true, false));
+    addHeapInformation(heapInfo, "cleanup");
+
+    ManifestSuccessData success = createManifestOutcome(stageConfig, 
OP_STAGE_JOB_COMMIT);
+    success.snapshotIOStatistics(getStageStatistics());
+    success.getIOStatistics().aggregate(heapInfo);
+
+    Configuration conf = getConfiguration();
+    enableManifestCommitter(conf);
+    String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, "");
+    Path reportDirPath = new Path(reportDir);
+    Path path = new Path(reportDirPath,
+        createJobSummaryFilename("TestLoadManifestsStage"));
+    final FileSystem summaryFS = path.getFileSystem(conf);
+    success.save(summaryFS, path, true);
+    LOG.info("Saved summary to {}", path);
+    ManifestPrinter showManifest = new ManifestPrinter();
+        ManifestSuccessData manifestSuccessData =

Review Comment:
   Nitpick: some indentation issues here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java:
##########
@@ -756,66 +756,75 @@ private void testConcurrentCommitTaskWithSubDir(int 
version)
     conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
         version);
 
-    conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+    final String fileImpl = "fs.file.impl";
+    final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem";
+    conf.setClass(fileImpl, RLFS.class, FileSystem.class);
     FileSystem.closeAll();
 
-    final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
-    final FileOutputCommitter amCommitter =
-        new FileOutputCommitter(outDir, jContext);
-    amCommitter.setupJob(jContext);
-
-    final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
-    taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
-    taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
-
-    final TextOutputFormat[] tof = new TextOutputFormat[2];
-    for (int i = 0; i < tof.length; i++) {
-      tof[i] = new TextOutputFormat() {
-        @Override
-        public Path getDefaultWorkFile(TaskAttemptContext context,
-            String extension) throws IOException {
-          final FileOutputCommitter foc = (FileOutputCommitter)
-              getOutputCommitter(context);
-          return new Path(new Path(foc.getWorkPath(), SUB_DIR),
-              getUniqueFile(context, getOutputName(context), extension));
-        }
-      };
-    }
-
-    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
     try {
-      for (int i = 0; i < taCtx.length; i++) {
-        final int taskIdx = i;
-        executor.submit(new Callable<Void>() {
+      final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+      final FileOutputCommitter amCommitter =
+          new FileOutputCommitter(outDir, jContext);
+      amCommitter.setupJob(jContext);
+
+      final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
+      taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
+      taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
+
+      final TextOutputFormat[] tof = new TextOutputFormat[2];
+      for (int i = 0; i < tof.length; i++) {
+        tof[i] = new TextOutputFormat() {
           @Override
-          public Void call() throws IOException, InterruptedException {
-            final OutputCommitter outputCommitter =
-                tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
-            outputCommitter.setupTask(taCtx[taskIdx]);
-            final RecordWriter rw =
-                tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
-            writeOutput(rw, taCtx[taskIdx]);
-            outputCommitter.commitTask(taCtx[taskIdx]);
-            return null;
+          public Path getDefaultWorkFile(TaskAttemptContext context,
+              String extension) throws IOException {
+            final FileOutputCommitter foc = (FileOutputCommitter)
+                getOutputCommitter(context);
+            return new Path(new Path(foc.getWorkPath(), SUB_DIR),
+                getUniqueFile(context, getOutputName(context), extension));
           }
-        });
+        };
       }
-    } finally {
-      executor.shutdown();
-      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-        LOG.info("Awaiting thread termination!");
+
+      final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
+      try {
+        for (int i = 0; i < taCtx.length; i++) {
+          final int taskIdx = i;
+          executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException, InterruptedException {
+              final OutputCommitter outputCommitter =
+                  tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
+              outputCommitter.setupTask(taCtx[taskIdx]);
+              final RecordWriter rw =
+                  tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
+              writeOutput(rw, taCtx[taskIdx]);
+              outputCommitter.commitTask(taCtx[taskIdx]);
+              return null;
+            }
+          });
+        }
+      } finally {
+        executor.shutdown();
+        while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          LOG.info("Awaiting thread termination!");
+        }
       }
-    }
 
-    amCommitter.commitJob(jContext);
-    final RawLocalFileSystem lfs = new RawLocalFileSystem();
-    lfs.setConf(conf);
-    assertFalse("Must not end up with sub_dir/sub_dir",
-        lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
+      amCommitter.commitJob(jContext);
+      final RawLocalFileSystem lfs = new RawLocalFileSystem();
+      lfs.setConf(conf);
+      assertFalse("Must not end up with sub_dir/sub_dir",
+          lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
 
-    // validate output
-    validateContent(OUT_SUB_DIR);
-    FileUtil.fullyDelete(new File(outDir.toString()));
+      // validate output
+      validateContent(OUT_SUB_DIR);
+      FileUtil.fullyDelete(new File(outDir.toString()));
+    } finally {
+      // needed to avoid this test contaminating others in the same JVM
+      FileSystem.closeAll();
+      conf.set(fileImpl, fileImplClassname);
+      conf.set(fileImpl, fileImplClassname);

Review Comment:
   Duplicated line?
   
   I wasn't sure why we need to set the conf here in the finally block. Did 
something mutate it after line 761, and now we need to restore it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to