This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 8caab7bd4 AVRO-3266: Work with hadoop 3.x PathOutputComitters (#1618)
8caab7bd4 is described below
commit 8caab7bd4a47e2c8e048f2c42478bdb22b18c993
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Fri Jul 1 19:37:08 2022 +0200
AVRO-3266: Work with hadoop 3.x PathOutputComitters (#1618)
* AVRO-3266: Work with hadoop 3.x PathOutputComitters
In hadoop 3.x the abstract class PathOutputCommitter defines the method
`getWorkPath()`, but in hadoop 2.x it only defined on
FileOutputCommitter. So to be compatible with both hadoop 2.x and 3.x
and support committers that does only implements PathOutputComitter and
not FileOutputCommitter we make the call to getWorkPath using
reflection.
* AVRO-3266: Add committer class name to exception
* Add comment on better fix when hadoop2 is dropped
Co-authored-by: Ryan Skraba <[email protected]>
---
.../org/apache/avro/mapreduce/AvroOutputFormatBase.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git
a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
index 587d787f2..5aa84b341 100644
---
a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
+++
b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
@@ -20,8 +20,9 @@ package org.apache.avro.mapreduce;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
@@ -85,6 +86,18 @@ public abstract class AvroOutputFormatBase<K, V> extends
FileOutputFormat<K, V>
return CodecFactory.nullCodec();
}
+ private Path getWorkPathFromCommitter(TaskAttemptContext context) throws
IOException {
+ // When Hadoop 2 support is dropped, this method removed to a simple cast
+ // See https://github.com/apache/avro/pull/1431/
+ OutputCommitter committer = getOutputCommitter(context);
+ try {
+ return (Path)
committer.getClass().getMethod("getWorkPath").invoke(committer);
+ } catch (ReflectiveOperationException e) {
+ throw new AvroRuntimeException(
+ "Committer: " + committer.getClass().getName() + " does not have
method getWorkPath", e);
+ }
+ }
+
/**
* Gets the target output stream where the Avro container file should be
* written.
@@ -93,7 +106,7 @@ public abstract class AvroOutputFormatBase<K, V> extends
FileOutputFormat<K, V>
* @return The target output stream.
*/
protected OutputStream getAvroFileOutputStream(TaskAttemptContext context)
throws IOException {
- Path path = new Path(((FileOutputCommitter)
getOutputCommitter(context)).getWorkPath(),
+ Path path = new Path(getWorkPathFromCommitter(context),
getUniqueFile(context,
context.getConfiguration().get("avro.mo.config.namedOutput", "part"),
org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);