This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b22b2188576 HIVE-28377: Add support for hive.output.file.extension to HCatStorer (Venkatasubrahmanian Narayanan, reviewed by Dmitriy Fingerman, Denys Kuzmenko, Yi Zhang) b22b2188576 is described below commit b22b218857636da11feecabd33e37371bad3e3a6 Author: Venkatasubrahmanian Narayanan <10137808+venkatsnaraya...@users.noreply.github.com> AuthorDate: Tue Dec 17 12:47:40 2024 -0800 HIVE-28377: Add support for hive.output.file.extension to HCatStorer (Venkatasubrahmanian Narayanan, reviewed by Dmitriy Fingerman, Denys Kuzmenko, Yi Zhang) Closes #5355 --- .../DynamicPartitionFileRecordWriterContainer.java | 3 +- .../mapreduce/FileOutputFormatContainer.java | 3 +- .../hive/hcatalog/mapreduce/HCatMapReduceTest.java | 13 +-- .../mapreduce/TestHCatDynamicPartitioned.java | 11 +- .../hive/hcatalog/mapreduce/TestHCatExtension.java | 112 +++++++++++++++++++++ 5 files changed, 131 insertions(+), 11 deletions(-) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index ceafabaa6eb..e43f5f60a6b 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -208,9 +208,10 @@ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContaine baseOutputCommitter.setupTask(currTaskContext); Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + String extension = HiveConf.getVar(currTaskContext.getConfiguration(), HiveConf.ConfVars.OUTPUT_FILE_EXTENSION, ""); Path childPath = new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, - currTaskContext.getConfiguration().get("mapreduce.output.basename", "part"), "")); + currTaskContext.getConfiguration().get("mapreduce.output.basename", "part"), extension)); RecordWriter baseRecordWriter = baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()), diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java index d503550d5da..154600da1f6 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -97,8 +97,9 @@ class FileOutputFormatContainer extends OutputFormatContainer { (org.apache.hadoop.mapred.RecordWriter)null, context); } else { Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir")); + String extension = HiveConf.getVar(context.getConfiguration(), HiveConf.ConfVars.OUTPUT_FILE_EXTENSION,""); Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), - context.getConfiguration().get("mapreduce.output.basename", "part"))); + context.getConfiguration().get("mapreduce.output.basename", "part")) + extension); rw = new StaticPartitionFileRecordWriterContainer( getBaseOutputFormat().getRecordWriter( diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index 975cf3cc4be..5b8cf36efd6 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -97,7 +98,7 @@ public abstract class HCatMapReduceTest extends HCatBaseTest { private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>(); private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>(); - private static FileSystem fs; + protected static FileSystem fs; private String externalTableLocation = null; protected String tableName; protected String serdeClass; @@ -282,7 +283,7 @@ public abstract class HCatMapReduceTest extends HCatBaseTest { Job runMRCreate(Map<String, String> partitionValues, List<HCatFieldSchema> partitionColumns, List<HCatRecord> records, int writeCount, boolean assertWrite) throws Exception { return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, - true, null); + true, Collections.emptyMap()); } /** @@ -298,12 +299,15 @@ public abstract class HCatMapReduceTest extends HCatBaseTest { */ Job runMRCreate(Map<String, String> partitionValues, List<HCatFieldSchema> partitionColumns, List<HCatRecord> records, int writeCount, boolean assertWrite, boolean asSingleMapTask, - String customDynamicPathPattern) throws Exception { + Map<String, String> properties) throws Exception { writeRecords = records; MapCreate.writeCount = 0; Configuration conf = new Configuration(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } Job job = new Job(conf, "hcat mapreduce write test"); job.setJarByClass(this.getClass()); job.setMapperClass(HCatMapReduceTest.MapCreate.class); @@ -331,9 +335,6 @@ public abstract class HCatMapReduceTest extends HCatBaseTest { job.setOutputFormatClass(HCatOutputFormat.class); OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); - if (customDynamicPathPattern != null) { - job.getConfiguration().set(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, customDynamicPathPattern); - } HCatOutputFormat.setOutput(job, outputJobInfo); job.setMapOutputKeyClass(BytesWritable.class); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 9ee887b933b..7bd750a9d5f 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -21,6 +21,7 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import org.junit.Assert; @@ -122,10 +123,14 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask, String customDynamicPathPattern) throws Exception { generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); + HashMap<String, String> properties = new HashMap<String, String>(); + if (customDynamicPathPattern != null) { + properties.put(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, customDynamicPathPattern); + } runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), NUM_RECORDS/2, - true, asSingleMapTask, customDynamicPathPattern); + true, asSingleMapTask, properties); runMRCreate(null, dataColumns, writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2, - true, asSingleMapTask, customDynamicPathPattern); + true, asSingleMapTask, properties); runMRRead(NUM_RECORDS); @@ -149,7 +154,7 @@ public class TestHCatDynamicPartitioned extends HCatMapReduceTest { try { generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false, - true, customDynamicPathPattern); + true, properties); if (HCatUtil.isHadoop23()) { Assert.assertTrue(job.isSuccessful()==false); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExtension.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExtension.java new file mode 100644 index 00000000000..2c99fee0ff5 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExtension.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestHCatExtension extends HCatMapReduceTest { + private static List<HCatRecord> writeRecords; + private static List<HCatFieldSchema> dataColumns; + protected static final int NUM_RECORDS = 20; + protected static final int NUM_TOP_PARTITIONS = 5; + + public TestHCatExtension(String formatName, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { + super(formatName, serdeClass, inputFormatClass, outputFormatClass); + tableName = "testHCatExtension_" + formatName; + generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); + generateDataColumns(); + } + + protected static void generateDataColumns() throws HCatException { + dataColumns = new ArrayList<HCatFieldSchema>(); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""))); + + } + + protected static void generateWriteRecords(int max, int mod, int offset) { + writeRecords = new ArrayList<HCatRecord>(); + + for (int i = 0; i < max; i++) { + List<Object> objList = new ArrayList<Object>(); + + objList.add(i); + objList.add("strvalue" + i); + objList.add(String.valueOf((i % mod) + offset)); + objList.add(String.valueOf((i / (max / 2)) + offset)); + writeRecords.add(new DefaultHCatRecord(objList)); + } + } + + @Override + protected List<FieldSchema> getPartitionKeys() { + List<FieldSchema> fields = new ArrayList<FieldSchema>(); + fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } + + @Override + protected List<FieldSchema> getTableColumns() { + List<FieldSchema> fields = new ArrayList<FieldSchema>(); + fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")); + return fields; + } + + @Test + public void testHCatExtension() throws Exception { + Map<String, String> properties = new HashMap<String, String>(); + properties.put(HiveConf.ConfVars.OUTPUT_FILE_EXTENSION.varname, ".test"); + generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); + runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), NUM_RECORDS/2, + true, false, properties); + runMRCreate(null, dataColumns, writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2, + true, false, properties); + String databaseName = (dbName == null) ? Warehouse.DEFAULT_DATABASE_NAME : dbName; + Table tbl = client.getTable(databaseName, tableName); + RemoteIterator<LocatedFileStatus> ls = fs.listFiles(new Path(tbl.getSd().getLocation()), true); + while (ls.hasNext()) { + LocatedFileStatus lfs = ls.next(); + Assert.assertFalse(lfs.isFile() && lfs.getPath().getName().startsWith("part") && !lfs.getPath().toString().endsWith(".test")); + } + } +}