This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f95b36fb8 [GOBBLIN-1986] Log `FileStatus` for only the first N inputs
in `GobblinWorkUnitsInputFormat` (#3860)
f95b36fb8 is described below
commit f95b36fb88193546709229d1baa6962230fa88bb
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Jan 17 11:11:39 2024 -0800
[GOBBLIN-1986] Log `FileStatus` for only the first N inputs in
`GobblinWorkUnitsInputFormat` (#3860)
---
.../runtime/mapreduce/GobblinWorkUnitsInputFormat.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
index f80998894..9d3c29bcc 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
@@ -60,6 +60,8 @@ import lombok.extern.slf4j.Slf4j;
public class GobblinWorkUnitsInputFormat extends InputFormat<LongWritable,
Text> {
private static final String MAX_MAPPERS =
GobblinWorkUnitsInputFormat.class.getName() + ".maxMappers";
+ private static final String MAX_INPUT_FILES_TO_LOG =
GobblinWorkUnitsInputFormat.class.getName() + ".maxInputFilesToLog";
+ private static final int DEFAULT_MAX_INPUT_FILES_TO_LOG = 10;
/**
* Set max mappers used in MR job.
@@ -72,9 +74,14 @@ public class GobblinWorkUnitsInputFormat extends
InputFormat<LongWritable, Text>
return conf.getInt(MAX_MAPPERS, Integer.MAX_VALUE);
}
+ public static int getMaxInputFilesToLog(Configuration conf) {
+ return conf.getInt(MAX_INPUT_FILES_TO_LOG, DEFAULT_MAX_INPUT_FILES_TO_LOG);
+ }
+
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
+ int maxInputFilesToLog =
this.getMaxInputFilesToLog(context.getConfiguration());
Path[] inputPaths = FileInputFormat.getInputPaths(context);
if (inputPaths == null || inputPaths.length == 0) {
@@ -91,7 +98,9 @@ public class GobblinWorkUnitsInputFormat extends
InputFormat<LongWritable, Text>
if (inputs == null) {
throw new IOException(String.format("Path %s does not exist.", path));
}
- log.info(String.format("Found %d input files at %s: %s", inputs.length,
path, Arrays.toString(inputs)));
+ int numInputsToLog = Math.min(inputs.length, maxInputFilesToLog);
+ FileStatus[] firstNumInputs = Arrays.copyOf(inputs, numInputsToLog);
+ log.info(String.format("Found %d input files at %s: **first %d only**
%s", inputs.length, path, numInputsToLog, Arrays.toString(firstNumInputs)));
for (FileStatus input : inputs) {
allPaths.add(input.getPath().toString());
}