This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 6aed5f24c TEZ-4617 prevent resource leaks for Closeables (#402)
(Dmitry Kryukov reviewed by Laszlo Bodor)
6aed5f24c is described below
commit 6aed5f24c4e46824dfb18c8849eda69ee979abaf
Author: Dmitry Kryukov <[email protected]>
AuthorDate: Thu Jan 22 14:12:29 2026 +0300
TEZ-4617 prevent resource leaks for Closeables (#402) (Dmitry Kryukov
reviewed by Laszlo Bodor)
---
.../apache/tez/dag/app/web/AMWebController.java | 13 +++--
.../examples/BroadcastAndOneToOneExample.java | 11 ++--
.../org/apache/tez/mapreduce/examples/Join.java | 60 ++++++++++++----------
.../tez/mapreduce/examples/RandomTextWriter.java | 55 +++++++++++---------
.../tez/mapreduce/examples/RandomWriter.java | 33 +++++++-----
5 files changed, 98 insertions(+), 74 deletions(-)
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index 2fcec1cf2..515219481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -894,7 +894,12 @@ public class AMWebController extends Controller {
@Override
public void render() {
response().setContentType(MimeType.HTML);
- PrintWriter pw = writer();
+ try (PrintWriter pw = writer()) {
+ render(pw);
+ }
+ }
+
+ private void render(PrintWriter pw) {
pw.write("<html>");
pw.write("<head>");
pw.write("<meta charset=\"utf-8\">");
@@ -903,11 +908,11 @@ public class AMWebController extends Controller {
pw.write("<body>");
if (historyUrl == null || historyUrl.isEmpty()) {
pw.write("<h1>Tez UI Url is not defined.</h1>" +
- "<p>To enable tracking url pointing to Tez UI, set the config <b>"
+
- TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the
tez-site.xml.</p>");
+ "<p>To enable tracking url pointing to Tez UI, set the config
<b>" +
+ TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the
tez-site.xml.</p>");
} else {
pw.write("<h1>Redirecting to Tez UI</h1>. <p>If you are not redirected
shortly, click " +
- "<a href='" + historyUrl + "'><b>here</b></a></p>"
+ "<a href='" + historyUrl + "'><b>here</b></a></p>"
);
pw.write("<script type='text/javascript'>setTimeout(function() { " +
"window.location.replace('" + historyUrl + "');" +
diff --git
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 44ed8364d..2e6232b99 100644
---
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -129,12 +129,13 @@ public class BroadcastAndOneToOneExample extends
Configured implements Tool {
int numBroadcastTasks = 2;
int numOneToOneTasks = 3;
+ int numNMs;
if (doLocalityCheck) {
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(tezConf);
- yarnClient.start();
- int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
- yarnClient.stop();
+ try (YarnClient yarnClient = YarnClient.createYarnClient()) {
+ yarnClient.init(tezConf);
+ yarnClient.start();
+ numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
+ }
// create enough 1-1 tasks to run in parallel
numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
if (numOneToOneTasks < 1) {
diff --git
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
index 92be83637..3c0dcf77d 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.examples;
import static
org.apache.tez.mapreduce.examples.ExampleDriver.getTezDecoratedConfiguration;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -82,45 +83,52 @@ public class Join extends Configured implements Tool {
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
Configuration conf = getConf();
- JobClient client = new JobClient(conf);
+ try (JobClient client = new JobClient(conf)) {
+ return run(client, conf, args);
+ }
+ }
+
+ private int run(JobClient client, Configuration conf, String[] args)
+ throws IOException, ClassNotFoundException, InterruptedException {
ClusterStatus cluster = client.getClusterStatus();
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String join_reduces = conf.get(REDUCES_PER_HOST);
if (join_reduces != null) {
- num_reduces = cluster.getTaskTrackers() *
- Integer.parseInt(join_reduces);
+ num_reduces = cluster.getTaskTrackers() *
+ Integer.parseInt(join_reduces);
}
+
Job job = new Job(conf);
job.setJobName("join");
job.setJarByClass(Sort.class);
- job.setMapperClass(Mapper.class);
+ job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
- Class<? extends InputFormat> inputFormatClass =
- SequenceFileInputFormat.class;
- Class<? extends OutputFormat> outputFormatClass =
- SequenceFileOutputFormat.class;
+ Class<? extends InputFormat> inputFormatClass =
+ SequenceFileInputFormat.class;
+ Class<? extends OutputFormat> outputFormatClass =
+ SequenceFileOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = TupleWritable.class;
String op = "inner";
List<String> otherArgs = new ArrayList<String>();
- for(int i=0; i < args.length; ++i) {
+ for (int i = 0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
} else if ("-inFormat".equals(args[i])) {
- inputFormatClass =
- Class.forName(args[++i]).asSubclass(InputFormat.class);
+ inputFormatClass =
+ Class.forName(args[++i]).asSubclass(InputFormat.class);
} else if ("-outFormat".equals(args[i])) {
- outputFormatClass =
- Class.forName(args[++i]).asSubclass(OutputFormat.class);
+ outputFormatClass =
+ Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else if ("-outKey".equals(args[i])) {
- outputKeyClass =
- Class.forName(args[++i]).asSubclass(WritableComparable.class);
+ outputKeyClass =
+
Class.forName(args[++i]).asSubclass(WritableComparable.class);
} else if ("-outValue".equals(args[i])) {
- outputValueClass =
- Class.forName(args[++i]).asSubclass(Writable.class);
+ outputValueClass =
+ Class.forName(args[++i]).asSubclass(Writable.class);
} else if ("-joinOp".equals(args[i])) {
op = args[++i];
} else {
@@ -131,7 +139,7 @@ public class Join extends Configured implements Tool {
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
- args[i-1]);
+ args[i - 1]);
return printUsage(); // exits
}
}
@@ -144,17 +152,17 @@ public class Join extends Configured implements Tool {
return printUsage();
}
- FileOutputFormat.setOutputPath(job,
- new Path(otherArgs.remove(otherArgs.size() - 1)));
+ FileOutputFormat.setOutputPath(job,
+ new Path(otherArgs.remove(otherArgs.size() - 1)));
List<Path> plist = new ArrayList<Path>(otherArgs.size());
for (String s : otherArgs) {
plist.add(new Path(s));
}
job.setInputFormatClass(CompositeInputFormat.class);
- job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
- CompositeInputFormat.compose(op, inputFormatClass,
- plist.toArray(new Path[0])));
+ job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
+ CompositeInputFormat.compose(op, inputFormatClass,
+ plist.toArray(new Path[0])));
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(outputKeyClass);
@@ -162,11 +170,11 @@ public class Join extends Configured implements Tool {
Date startTime = new Date();
System.out.println("Job started: " + startTime);
- int ret = job.waitForCompletion(true) ? 0 : 1 ;
+ int ret = job.waitForCompletion(true) ? 0 : 1;
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
- System.out.println("The job took " +
- (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+ System.out.println("The job took " +
+ (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
return ret;
}
diff --git
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java
index 55404ba1b..a1e6dbf06 100644
---
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java
+++
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java
@@ -180,70 +180,75 @@ public class RandomTextWriter extends Configured
implements Tool {
}
Configuration conf = getConf();
- JobClient client = new JobClient(conf);
+ try (JobClient client = new JobClient(conf)) {
+ return run(client, conf, args);
+ }
+ }
+
+ private int run(JobClient client, Configuration conf, String[] args)
+ throws IOException, ClassNotFoundException, InterruptedException {
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
- long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
- 1*1024*1024*1024);
+ long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 *
1024);
if (numBytesToWritePerMap == 0) {
- System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0");
+ System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0");
return -2;
}
- long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
- numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
+ long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
+ numMapsPerHost * numBytesToWritePerMap *
cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
}
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
-
+
Job job = new Job(conf);
-
+
job.setJarByClass(RandomTextWriter.class);
job.setJobName("random-text-writer");
-
+
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
-
+
job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
- job.setMapperClass(RandomTextMapper.class);
-
- Class<? extends OutputFormat> outputFormatClass =
- SequenceFileOutputFormat.class;
+ job.setMapperClass(RandomTextMapper.class);
+
+ Class<? extends OutputFormat> outputFormatClass =
+ SequenceFileOutputFormat.class;
List<String> otherArgs = new ArrayList<String>();
- for(int i=0; i < args.length; ++i) {
+ for (int i = 0; i < args.length; ++i) {
try {
if ("-outFormat".equals(args[i])) {
- outputFormatClass =
- Class.forName(args[++i]).asSubclass(OutputFormat.class);
+ outputFormatClass =
+ Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else {
otherArgs.add(args[i]);
}
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
- args[i-1]);
+ args[i - 1]);
return printUsage(); // exits
}
}
job.setOutputFormatClass(outputFormatClass);
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
-
+
System.out.println("Running " + numMaps + " maps.");
-
+
// reducer NONE
job.setNumReduceTasks(0);
-
+
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
- System.out.println("The job took " +
- (endTime.getTime() - startTime.getTime()) /1000 +
- " seconds.");
-
+ System.out.println("The job took " +
+ (endTime.getTime() - startTime.getTime()) / 1000 +
+ " seconds.");
+
return ret;
}
diff --git
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
index 1627d688a..4b7a4e572 100644
---
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
+++
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
@@ -246,17 +246,22 @@ public class RandomWriter extends Configured implements
Tool {
Path outDir = new Path(args[0]);
Configuration conf = getConf();
- JobClient client = new JobClient(conf);
+ try (JobClient client = new JobClient(conf)) {
+ return run(client, conf, outDir);
+ }
+ }
+
+ private int run(JobClient client, Configuration conf, Path outDir)
+ throws IOException, ClassNotFoundException, InterruptedException {
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
- long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
- 1*1024*1024*1024);
+ long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 *
1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
return -2;
}
- long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
- numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
+ long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
+ numMapsPerHost * numBytesToWritePerMap *
cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
@@ -265,31 +270,31 @@ public class RandomWriter extends Configured implements
Tool {
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
Job job = new Job(conf);
-
+
job.setJarByClass(RandomWriter.class);
job.setJobName("random-writer");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(RandomInputFormat.class);
- job.setMapperClass(RandomMapper.class);
+ job.setMapperClass(RandomMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+
System.out.println("Running " + numMaps + " maps.");
-
+
// reducer NONE
job.setNumReduceTasks(0);
-
+
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
- System.out.println("The job took " +
- (endTime.getTime() - startTime.getTime()) /1000 +
- " seconds.");
-
+ System.out.println("The job took " +
+ (endTime.getTime() - startTime.getTime()) / 1000 +
+ " seconds.");
+
return ret;
}