This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new 8caab7bd4 AVRO-3266: Work with hadoop 3.x PathOutputComitters (#1618)
8caab7bd4 is described below

commit 8caab7bd4a47e2c8e048f2c42478bdb22b18c993
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Fri Jul 1 19:37:08 2022 +0200

    AVRO-3266: Work with hadoop 3.x PathOutputComitters (#1618)
    
    * AVRO-3266: Work with hadoop 3.x PathOutputComitters
    
    In hadoop 3.x the abstract class PathOutputCommitter defines the method
    `getWorkPath()`, but in hadoop 2.x it only defined on
    FileOutputCommitter. So to be compatible with both hadoop 2.x and 3.x
    and support committers that does only implements PathOutputComitter and
    not FileOutputCommitter we make the call to getWorkPath using
    reflection.
    
    * AVRO-3266: Add committer class name to exception
    
    * Add comment on better fix when hadoop2 is dropped
    
    Co-authored-by: Ryan Skraba <[email protected]>
---
 .../org/apache/avro/mapreduce/AvroOutputFormatBase.java | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
 
b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
index 587d787f2..5aa84b341 100644
--- 
a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
+++ 
b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
@@ -20,8 +20,9 @@ package org.apache.avro.mapreduce;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.hadoop.file.HadoopCodecFactory;
@@ -85,6 +86,18 @@ public abstract class AvroOutputFormatBase<K, V> extends 
FileOutputFormat<K, V>
     return CodecFactory.nullCodec();
   }
 
+  private Path getWorkPathFromCommitter(TaskAttemptContext context) throws 
IOException {
+    // When Hadoop 2 support is dropped, this method removed to a simple cast
+    // See https://github.com/apache/avro/pull/1431/
+    OutputCommitter committer = getOutputCommitter(context);
+    try {
+      return (Path) 
committer.getClass().getMethod("getWorkPath").invoke(committer);
+    } catch (ReflectiveOperationException e) {
+      throw new AvroRuntimeException(
+          "Committer: " + committer.getClass().getName() + " does not have 
method getWorkPath", e);
+    }
+  }
+
   /**
    * Gets the target output stream where the Avro container file should be
    * written.
@@ -93,7 +106,7 @@ public abstract class AvroOutputFormatBase<K, V> extends 
FileOutputFormat<K, V>
    * @return The target output stream.
    */
   protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) 
throws IOException {
-    Path path = new Path(((FileOutputCommitter) 
getOutputCommitter(context)).getWorkPath(),
+    Path path = new Path(getWorkPathFromCommitter(context),
         getUniqueFile(context, 
context.getConfiguration().get("avro.mo.config.namedOutput", "part"),
             org.apache.avro.mapred.AvroOutputFormat.EXT));
     return path.getFileSystem(context.getConfiguration()).create(path);

Reply via email to