This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b22b2188576 HIVE-28377: Add support for hive.output.file.extension to 
HCatStorer (Venkatasubrahmanian Narayanan, reviewed by Dmitriy Fingerman, Denys 
Kuzmenko, Yi Zhang)
b22b2188576 is described below

commit b22b218857636da11feecabd33e37371bad3e3a6
Author: Venkatasubrahmanian Narayanan 
<10137808+venkatsnaraya...@users.noreply.github.com>
AuthorDate: Tue Dec 17 12:47:40 2024 -0800

    HIVE-28377: Add support for hive.output.file.extension to HCatStorer 
(Venkatasubrahmanian Narayanan, reviewed by Dmitriy Fingerman, Denys Kuzmenko, 
Yi Zhang)
    
    Closes #5355
---
 .../DynamicPartitionFileRecordWriterContainer.java |   3 +-
 .../mapreduce/FileOutputFormatContainer.java       |   3 +-
 .../hive/hcatalog/mapreduce/HCatMapReduceTest.java |  13 +--
 .../mapreduce/TestHCatDynamicPartitioned.java      |  11 +-
 .../hive/hcatalog/mapreduce/TestHCatExtension.java | 112 +++++++++++++++++++++
 5 files changed, 131 insertions(+), 11 deletions(-)

diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
index ceafabaa6eb..e43f5f60a6b 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
@@ -208,9 +208,10 @@ class DynamicPartitionFileRecordWriterContainer extends 
FileRecordWriterContaine
       baseOutputCommitter.setupTask(currTaskContext);
 
       Path parentDir = new 
Path(currTaskContext.getConfiguration().get("mapred.work.output.dir"));
+      String extension = HiveConf.getVar(currTaskContext.getConfiguration(), 
HiveConf.ConfVars.OUTPUT_FILE_EXTENSION, "");
       Path childPath =
           new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext,
-              
currTaskContext.getConfiguration().get("mapreduce.output.basename", "part"), 
""));
+              
currTaskContext.getConfiguration().get("mapreduce.output.basename", "part"), 
extension));
 
       RecordWriter baseRecordWriter =
           
baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()),
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
index d503550d5da..154600da1f6 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
@@ -97,8 +97,9 @@ class FileOutputFormatContainer extends OutputFormatContainer 
{
           (org.apache.hadoop.mapred.RecordWriter)null, context);
     } else {
       Path parentDir = new 
Path(context.getConfiguration().get("mapred.work.output.dir"));
+      String extension = HiveConf.getVar(context.getConfiguration(), 
HiveConf.ConfVars.OUTPUT_FILE_EXTENSION,"");
       Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new 
JobConf(context.getConfiguration()),
-               context.getConfiguration().get("mapreduce.output.basename", 
"part")));
+               context.getConfiguration().get("mapreduce.output.basename", 
"part")) + extension);
 
       rw = new StaticPartitionFileRecordWriterContainer(
           getBaseOutputFormat().getRecordWriter(
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
index 975cf3cc4be..5b8cf36efd6 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -97,7 +98,7 @@ public abstract class HCatMapReduceTest extends HCatBaseTest {
   private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
   private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
 
-  private static FileSystem fs;
+  protected static FileSystem fs;
   private String externalTableLocation = null;
   protected String tableName;
   protected String serdeClass;
@@ -282,7 +283,7 @@ public abstract class HCatMapReduceTest extends 
HCatBaseTest {
   Job runMRCreate(Map<String, String> partitionValues, List<HCatFieldSchema> 
partitionColumns,
       List<HCatRecord> records, int writeCount, boolean assertWrite) throws 
Exception {
     return runMRCreate(partitionValues, partitionColumns, records, writeCount, 
assertWrite,
-        true, null);
+        true, Collections.emptyMap());
   }
 
   /**
@@ -298,12 +299,15 @@ public abstract class HCatMapReduceTest extends 
HCatBaseTest {
    */
   Job runMRCreate(Map<String, String> partitionValues, List<HCatFieldSchema> 
partitionColumns,
       List<HCatRecord> records, int writeCount, boolean assertWrite, boolean 
asSingleMapTask,
-      String customDynamicPathPattern) throws Exception {
+      Map<String, String> properties) throws Exception {
 
     writeRecords = records;
     MapCreate.writeCount = 0;
 
     Configuration conf = new Configuration();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
     Job job = new Job(conf, "hcat mapreduce write test");
     job.setJarByClass(this.getClass());
     job.setMapperClass(HCatMapReduceTest.MapCreate.class);
@@ -331,9 +335,6 @@ public abstract class HCatMapReduceTest extends 
HCatBaseTest {
     job.setOutputFormatClass(HCatOutputFormat.class);
 
     OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, 
partitionValues);
-    if (customDynamicPathPattern != null) {
-      job.getConfiguration().set(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, 
customDynamicPathPattern);
-    }
     HCatOutputFormat.setOutput(job, outputJobInfo);
 
     job.setMapOutputKeyClass(BytesWritable.class);
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
index 9ee887b933b..7bd750a9d5f 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
@@ -21,6 +21,7 @@ package org.apache.hive.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.junit.Assert;
@@ -122,10 +123,14 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
   protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
       String customDynamicPathPattern) throws Exception {
     generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+    HashMap<String, String> properties = new HashMap<String, String>();
+    if (customDynamicPathPattern != null) {
+      properties.put(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, 
customDynamicPathPattern);
+    }
     runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), 
NUM_RECORDS/2,
-        true, asSingleMapTask, customDynamicPathPattern);
+        true, asSingleMapTask, properties);
     runMRCreate(null, dataColumns, 
writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2,
-        true, asSingleMapTask, customDynamicPathPattern);
+        true, asSingleMapTask, properties);
 
     runMRRead(NUM_RECORDS);
 
@@ -149,7 +154,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     try {
       generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
       Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, 
false,
-          true, customDynamicPathPattern);
+          true, properties);
 
       if (HCatUtil.isHadoop23()) {
         Assert.assertTrue(job.isSuccessful()==false);
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExtension.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExtension.java
new file mode 100644
index 00000000000..2c99fee0ff5
--- /dev/null
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExtension.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestHCatExtension extends HCatMapReduceTest {
+    private static List<HCatRecord> writeRecords;
+    private static List<HCatFieldSchema> dataColumns;
+    protected static final int NUM_RECORDS = 20;
+    protected static final int NUM_TOP_PARTITIONS = 5;
+
+    public TestHCatExtension(String formatName, String serdeClass, String 
inputFormatClass, String outputFormatClass) throws Exception {
+        super(formatName, serdeClass, inputFormatClass, outputFormatClass);
+        tableName = "testHCatExtension_" + formatName;
+        generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+        generateDataColumns();
+    }
+
+    protected static void generateDataColumns() throws HCatException {
+        dataColumns = new ArrayList<HCatFieldSchema>();
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new 
FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new 
FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new 
FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new 
FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, "")));
+
+    }
+
+    protected static void generateWriteRecords(int max, int mod, int offset) {
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < max; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("strvalue" + i);
+            objList.add(String.valueOf((i % mod) + offset));
+            objList.add(String.valueOf((i / (max / 2)) + offset));
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+    }
+
+    @Override
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    @Override
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    @Test
+    public void testHCatExtension() throws Exception {
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put(HiveConf.ConfVars.OUTPUT_FILE_EXTENSION.varname, 
".test");
+        generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+        runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), 
NUM_RECORDS/2,
+                true, false, properties);
+        runMRCreate(null, dataColumns, 
writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2,
+                true, false, properties);
+        String databaseName = (dbName == null) ? 
Warehouse.DEFAULT_DATABASE_NAME : dbName;
+        Table tbl = client.getTable(databaseName, tableName);
+        RemoteIterator<LocatedFileStatus> ls = fs.listFiles(new 
Path(tbl.getSd().getLocation()), true);
+        while (ls.hasNext()) {
+            LocatedFileStatus lfs = ls.next();
+            Assert.assertFalse(lfs.isFile() && 
lfs.getPath().getName().startsWith("part") && 
!lfs.getPath().toString().endsWith(".test"));
+        }
+    }
+}

Reply via email to