This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 6fd287a08e PHOENIX-6721 CSV bulkload tool fails with 
FileNotFoundException if --output points to the S3 location
6fd287a08e is described below

commit 6fd287a08e55a2bd06c15a543c4c211b5e815986
Author: Sergey Soldatov <s...@.apache.org>
AuthorDate: Tue May 31 13:37:20 2022 -0700

    PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output 
points to the S3 location
    
    Co-authored-by: Istvan Toth <st...@apache.org>
---
 .../phoenix/mapreduce/MultiHfileOutputFormat.java  | 25 +++++++++++-----------
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
index 3a9071e123..a027f00400 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
@@ -67,10 +67,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.phoenix.compat.hbase.CompatUtil;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
@@ -113,7 +114,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
     @Override
     public RecordWriter<TableRowkeyPair, Cell> 
getRecordWriter(TaskAttemptContext context)
             throws IOException, InterruptedException {
-        return createRecordWriter(context);
+        return createRecordWriter(context, this.getOutputCommitter(context));
     }
 
     /**
@@ -122,11 +123,11 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
      * @return
      * @throws IOException 
      */
-    static <V extends Cell> RecordWriter<TableRowkeyPair, V> 
createRecordWriter(final TaskAttemptContext context)
+    static <V extends Cell> RecordWriter<TableRowkeyPair, V> 
createRecordWriter(
+        final TaskAttemptContext context, final OutputCommitter committer)
             throws IOException {
         // Get the path of the temporary output file
-        final Path outputPath = FileOutputFormat.getOutputPath(context);
-        final Path outputdir = new FileOutputCommitter(outputPath, 
context).getWorkPath();
+        final Path outputdir = ((PathOutputCommitter) committer).getWorkPath();
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = outputdir.getFileSystem(conf);
      
@@ -336,7 +337,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
     static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 
conf,final String tableName) {
         Map<byte[], Algorithm> compressionMap = new 
TreeMap<byte[],Algorithm>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, 
tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return compressionMap;
         }
         Map<byte[], String> stringMap = 
createFamilyConfValueMap(tableConfigs,COMPRESSION_FAMILIES_CONF_KEY);
@@ -355,7 +356,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
      */
     private static Map<String, String> getTableConfigurations(Configuration 
conf, final String tableName) {
         String tableDefn = conf.get(tableName);
-        if(StringUtils.isEmpty(tableDefn)) {
+        if (StringUtils.isEmpty(tableDefn)) {
             return null;
         }
         TargetTableRef table = 
TargetTableRefFunctions.FROM_JSON.apply(tableDefn);
@@ -374,7 +375,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
     static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration 
conf,final String tableName) {
         Map<byte[], BloomType> bloomTypeMap = new 
TreeMap<byte[],BloomType>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, 
tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return bloomTypeMap;
         }
         Map<byte[], String> stringMap = 
createFamilyConfValueMap(tableConfigs,BLOOM_TYPE_FAMILIES_CONF_KEY);
@@ -396,7 +397,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
     static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration 
conf,final String tableName) {
         Map<byte[], Integer> blockSizeMap = new 
TreeMap<byte[],Integer>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, 
tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return blockSizeMap;
         }
         Map<byte[], String> stringMap = 
createFamilyConfValueMap(tableConfigs,BLOCK_SIZE_FAMILIES_CONF_KEY);
@@ -420,7 +421,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
         
         Map<byte[], DataBlockEncoding> encoderMap = new 
TreeMap<byte[],DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, 
tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return encoderMap;
         }
         Map<byte[], String> stringMap = 
createFamilyConfValueMap(tableConfigs,DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
@@ -441,7 +442,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
     private static Map<byte[], String> 
createFamilyConfValueMap(Map<String,String> configs, String confName) {
         Map<byte[], String> confValMap = new TreeMap<byte[], 
String>(Bytes.BYTES_COMPARATOR);
         String confVal = configs.get(confName);
-        if(StringUtils.isEmpty(confVal)) {
+        if (StringUtils.isEmpty(confVal)) {
             return confValMap;
         }
         for (String familyConf : confVal.split("&")) {
@@ -677,7 +678,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<TableRowkeyPair, Ce
 
         // tableStartKeys for all tables.
         Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
-        for(TargetTableRef table : tablesToBeLoaded) {
+        for (TargetTableRef table : tablesToBeLoaded) {
            final String tableName = table.getPhysicalName();
            try(Connection hbaseConn = 
ConnectionFactory.createConnection(conf);){
                 Set<TableRowkeyPair> startKeys =

Reply via email to