This is an automated email from the ASF dual-hosted git repository.
sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c1d592f4f98 HIVE-28069: Iceberg: Implement Merge task functionality
for Iceberg tables (#5076) (Sourabh Badhya reviewed by Denys Kuzmenko,
Krisztian Kasa)
c1d592f4f98 is described below
commit c1d592f4f98e49614ffaed57813983b579b9147f
Author: Sourabh Badhya <[email protected]>
AuthorDate: Thu Mar 21 15:48:05 2024 +0530
HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables
(#5076) (Sourabh Badhya reviewed by Denys Kuzmenko, Krisztian Kasa)
---
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 2 +-
.../mr/hive/HiveIcebergOutputCommitter.java | 66 +++
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 27 ++
.../mr/hive/IcebergMergeTaskProperties.java | 55 +++
.../test/queries/positive/iceberg_merge_files.q | 95 +++++
.../positive/llap/iceberg_merge_files.q.out | 458 +++++++++++++++++++++
.../test/resources/testconfiguration.properties | 2 +
.../hadoop/hive/ql/io/CombineHiveInputFormat.java | 13 +-
.../hive/ql/metadata/HiveStorageHandler.java | 17 +
.../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 41 ++
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 13 +-
.../ql/plan/ConditionalResolverMergeFiles.java | 132 ++++--
.../hadoop/hive/ql/plan/MergeTaskProperties.java | 30 ++
13 files changed, 921 insertions(+), 30 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index dd329c122aa..1ea78eeba54 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -191,7 +191,7 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
@Override
public boolean shouldSkipCombine(Path path, Configuration conf) {
- return true;
+ return false;
}
@Override
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index b4d5ce98f59..d9f3116ff84 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
@@ -167,6 +170,8 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
LOG.info("CommitTask found no serialized table in config for
table: {}.", output);
}
}, IOException.class);
+
+ cleanMergeTaskInputFiles(jobConf, tableExecutor, context);
} finally {
if (tableExecutor != null) {
tableExecutor.shutdown();
@@ -741,4 +746,65 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
throw new NotFoundException("Can not read or parse committed file: %s",
fileForCommitLocation);
}
}
+
+ public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws
IOException {
+ List<OutputTable> outputs = collectOutputs(jobContexts);
+ ExecutorService fileExecutor =
fileExecutor(jobContexts.get(0).getJobConf());
+ ExecutorService tableExecutor =
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+ Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
+ try {
+ Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+ .map(jobContext -> new SimpleImmutableEntry<>(kv.table,
jobContext))))
+ .suppressFailureWhenFinished()
+ .executeWith(tableExecutor)
+ .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge
input file for the table {}", output, exc))
+ .run(output -> {
+ JobContext jobContext = output.getValue();
+ JobConf jobConf = jobContext.getJobConf();
+ LOG.info("Cleaning job for jobID: {}, table: {}",
jobContext.getJobID(), output);
+
+ Table table = output.getKey();
+ FileSystem fileSystem = new
Path(table.location()).getFileSystem(jobConf);
+ String jobLocation = generateJobLocation(table.location(),
jobConf, jobContext.getJobID());
+ // list jobLocation to get number of forCommit files
+ // we do this because map/reduce num in jobConf is unreliable
+ // and we have no access to vertex status info
+ int numTasks = listForCommits(jobConf, jobLocation).size();
+ FilesForCommit results = collectResults(numTasks,
fileExecutor, table.location(), jobContext,
+ table.io(), false);
+ for (DataFile dataFile : results.dataFiles()) {
+ FileStatus fileStatus = fileSystem.getFileStatus(new
Path(dataFile.path().toString()));
+ dataFiles.add(fileStatus);
+ }
+ }, IOException.class);
+ } finally {
+ fileExecutor.shutdown();
+ if (tableExecutor != null) {
+ tableExecutor.shutdown();
+ }
+ }
+ return Lists.newArrayList(dataFiles);
+ }
+
+ private void cleanMergeTaskInputFiles(JobConf jobConf,
+ ExecutorService tableExecutor,
+ TaskAttemptContext context) throws
IOException {
+ // Merge task has merged several files into one. Hence we need to remove
the stale files.
+ // At this stage the file is written and task-committed, but the old files
are still present.
+ if
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
{
+ MapWork mrwork = Utilities.getMapWork(jobConf);
+ if (mrwork != null) {
+ List<Path> mergedPaths = mrwork.getInputPaths();
+ if (mergedPaths != null) {
+ Tasks.foreach(mergedPaths)
+ .retry(3)
+ .executeWith(tableExecutor)
+ .run(path -> {
+ FileSystem fs = path.getFileSystem(context.getJobConf());
+ fs.delete(path, true);
+ }, IOException.class);
+ }
+ }
+ }
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 7fcaebbfcad..147107638ac 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -45,6 +45,7 @@ import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
@@ -107,6 +108,7 @@ import
org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -2029,4 +2031,29 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
throw new SemanticException(String.format("Error while fetching the
partitions due to: %s", e));
}
}
+
+ @Override
+ public boolean supportsMergeFiles() {
+ return true;
+ }
+
+ @Override
+ public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws
IOException {
+ String tableName = properties.getProperty(Catalogs.NAME);
+ String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+ Configuration configuration = SessionState.getSessionConf();
+ List<JobContext> originalContextList = generateJobContext(configuration,
tableName, snapshotRef);
+ List<JobContext> jobContextList = originalContextList.stream()
+ .map(TezUtil::enrichContextWithVertexId)
+ .collect(Collectors.toList());
+ if (jobContextList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return new HiveIcebergOutputCommitter().getOutputFiles(jobContextList);
+ }
+
+ @Override
+ public MergeTaskProperties getMergeTaskProperties(Properties properties) {
+ return new IcebergMergeTaskProperties(properties);
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
new file mode 100644
index 00000000000..ff47a801a5b
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergMergeTaskProperties implements MergeTaskProperties {
+
+ private final Properties properties;
+ private static final StorageFormatFactory storageFormatFactory = new
StorageFormatFactory();
+
+ IcebergMergeTaskProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public Path getTmpLocation() {
+ String location = properties.getProperty(Catalogs.LOCATION);
+ return new Path(location + "/data/");
+ }
+
+ public StorageFormatDescriptor getStorageFormatDescriptor() throws
IOException {
+ FileFormat fileFormat =
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+ StorageFormatDescriptor descriptor =
storageFormatFactory.get(fileFormat.name());
+ if (descriptor == null) {
+ throw new IOException("Unsupported storage format descriptor");
+ }
+ return descriptor;
+ }
+
+}
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q
new file mode 100644
index 00000000000..5d5cd7aa6d8
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q
@@ -0,0 +1,95 @@
+--! qt:dataset:src
+set hive.merge.mapredfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.tezfiles=true;
+set hive.optimize.sort.dynamic.partition.threshold=-1;
+set mapred.reduce.tasks=5;
+set hive.blobstore.supported.schemes=hdfs,file;
+
+-- SORT_QUERY_RESULTS
+create table orc_part_source(key string, value string, ds string) partitioned
by spec (ds) stored by iceberg stored as orc;
+create table orc_source(key string) stored by iceberg stored as orc;
+
+-- The partitioned table must have 2 files per partition (necessary for merge
task)
+insert overwrite table orc_part_source partition(ds='102') select * from src;
+insert into table orc_part_source partition(ds='102') select * from src;
+insert overwrite table orc_part_source partition(ds='103') select * from src;
+insert into table orc_part_source partition(ds='103') select * from src;
+
+-- The unpartitioned table must have 2 files.
+insert overwrite table orc_source select key from src;
+insert into table orc_source select key from src;
+
+select count(*) from orc_source;
+select count(*) from orc_part_source;
+
+select count(distinct(file_path)) from default.orc_source.files;
+select count(distinct(file_path)) from default.orc_part_source.files;
+
+-- Insert into the tables both for unpartitioned and partitioned cases for ORC
formats.
+insert into table orc_source select * from orc_source;
+insert into table orc_part_source select * from orc_part_source where ds = 102
union all select * from orc_part_source where ds = 103;
+
+select count(*) from orc_source;
+select count(*) from orc_part_source;
+
+select count(distinct(file_path)) from default.orc_source.files;
+select count(distinct(file_path)) from default.orc_part_source.files;
+
+create table parquet_part_source(key string, value string, ds string)
partitioned by spec (ds) stored by iceberg stored as parquet;
+create table parquet_source(key string) stored by iceberg stored as parquet;
+
+-- The partitioned table must have 2 files per partition (necessary for merge
task)
+insert overwrite table parquet_part_source partition(ds='102') select * from
src;
+insert into table parquet_part_source partition(ds='102') select * from src;
+insert overwrite table parquet_part_source partition(ds='103') select * from
src;
+insert into table parquet_part_source partition(ds='103') select * from src;
+
+-- The unpartitioned table must have 2 files.
+insert overwrite table parquet_source select key from src;
+insert into table parquet_source select key from src;
+
+select count(*) from parquet_source;
+select count(*) from parquet_part_source;
+
+select count(distinct(file_path)) from default.parquet_source.files;
+select count(distinct(file_path)) from default.parquet_part_source.files;
+
+-- Insert into the tables both for unpartitioned and partitioned cases for
Parquet formats.
+insert into table parquet_source select * from parquet_source;
+insert into table parquet_part_source select * from parquet_part_source where
ds = 102 union all select * from orc_part_source where ds = 103;
+
+select count(*) from parquet_source;
+select count(*) from parquet_part_source;
+
+select count(distinct(file_path)) from default.parquet_source.files;
+select count(distinct(file_path)) from default.parquet_part_source.files;
+
+create table avro_part_source(key string, value string, ds string) partitioned
by spec (ds) stored by iceberg stored as avro;
+create table avro_source(key string) stored by iceberg stored as avro;
+
+-- The partitioned table must have 2 files per partition (necessary for merge
task)
+insert overwrite table avro_part_source partition(ds='102') select * from src;
+insert into table avro_part_source partition(ds='102') select * from src;
+insert overwrite table avro_part_source partition(ds='103') select * from src;
+insert into table avro_part_source partition(ds='103') select * from src;
+
+-- The unpartitioned table must have 2 files.
+insert overwrite table avro_source select key from src;
+insert into table avro_source select key from src;
+
+select count(*) from avro_source;
+select count(*) from avro_part_source;
+
+select count(distinct(file_path)) from default.avro_source.files;
+select count(distinct(file_path)) from default.avro_part_source.files;
+
+-- Insert into the tables both for unpartitioned and partitioned cases for
Avro formats.
+insert into table avro_source select * from avro_source;
+insert into table avro_part_source select * from avro_part_source where ds =
102 union all select * from avro_part_source where ds = 103;
+
+select count(*) from avro_source;
+select count(*) from avro_part_source;
+
+select count(distinct(file_path)) from default.avro_source.files;
+select count(distinct(file_path)) from default.avro_part_source.files;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out
new file mode 100644
index 00000000000..a17c1130efc
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_merge_files.q.out
@@ -0,0 +1,458 @@
+PREHOOK: query: create table orc_part_source(key string, value string, ds
string) partitioned by spec (ds) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orc_part_source
+POSTHOOK: query: create table orc_part_source(key string, value string, ds
string) partitioned by spec (ds) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orc_part_source
+PREHOOK: query: create table orc_source(key string) stored by iceberg stored
as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: create table orc_source(key string) stored by iceberg stored
as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: insert overwrite table orc_part_source partition(ds='102')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=102
+POSTHOOK: query: insert overwrite table orc_part_source partition(ds='102')
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=102
+PREHOOK: query: insert into table orc_part_source partition(ds='102') select *
from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=102
+POSTHOOK: query: insert into table orc_part_source partition(ds='102') select
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=102
+PREHOOK: query: insert overwrite table orc_part_source partition(ds='103')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=103
+POSTHOOK: query: insert overwrite table orc_part_source partition(ds='103')
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=103
+PREHOOK: query: insert into table orc_part_source partition(ds='103') select *
from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_part_source@ds=103
+POSTHOOK: query: insert into table orc_part_source partition(ds='103') select
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_part_source@ds=103
+PREHOOK: query: insert overwrite table orc_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: insert overwrite table orc_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: insert into table orc_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: insert into table orc_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: select count(*) from orc_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+1000
+PREHOOK: query: select count(*) from orc_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.orc_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+2
+PREHOOK: query: select count(distinct(file_path)) from
default.orc_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.orc_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+4
+PREHOOK: query: insert into table orc_source select * from orc_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+PREHOOK: Output: default@orc_source
+POSTHOOK: query: insert into table orc_source select * from orc_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+POSTHOOK: Output: default@orc_source
+PREHOOK: query: insert into table orc_part_source select * from
orc_part_source where ds = 102 union all select * from orc_part_source where ds
= 103
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+PREHOOK: Output: default@orc_part_source
+POSTHOOK: query: insert into table orc_part_source select * from
orc_part_source where ds = 102 union all select * from orc_part_source where ds
= 103
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+POSTHOOK: Output: default@orc_part_source
+PREHOOK: query: select count(*) from orc_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(*) from orc_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+4000
+PREHOOK: query: select count(distinct(file_path)) from default.orc_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.orc_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_source
+#### A masked pattern was here ####
+3
+PREHOOK: query: select count(distinct(file_path)) from
default.orc_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.orc_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+#### A masked pattern was here ####
+6
+PREHOOK: query: create table parquet_part_source(key string, value string, ds
string) partitioned by spec (ds) stored by iceberg stored as parquet
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_part_source
+POSTHOOK: query: create table parquet_part_source(key string, value string, ds
string) partitioned by spec (ds) stored by iceberg stored as parquet
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_part_source
+PREHOOK: query: create table parquet_source(key string) stored by iceberg
stored as parquet
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: create table parquet_source(key string) stored by iceberg
stored as parquet
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: insert overwrite table parquet_part_source partition(ds='102')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=102
+POSTHOOK: query: insert overwrite table parquet_part_source
partition(ds='102') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=102
+PREHOOK: query: insert into table parquet_part_source partition(ds='102')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=102
+POSTHOOK: query: insert into table parquet_part_source partition(ds='102')
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=102
+PREHOOK: query: insert overwrite table parquet_part_source partition(ds='103')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=103
+POSTHOOK: query: insert overwrite table parquet_part_source
partition(ds='103') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=103
+PREHOOK: query: insert into table parquet_part_source partition(ds='103')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_part_source@ds=103
+POSTHOOK: query: insert into table parquet_part_source partition(ds='103')
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_part_source@ds=103
+PREHOOK: query: insert overwrite table parquet_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: insert overwrite table parquet_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: insert into table parquet_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: insert into table parquet_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: select count(*) from parquet_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+1000
+PREHOOK: query: select count(*) from parquet_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(distinct(file_path)) from
default.parquet_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.parquet_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+2
+PREHOOK: query: select count(distinct(file_path)) from
default.parquet_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.parquet_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+4
+PREHOOK: query: insert into table parquet_source select * from parquet_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+PREHOOK: Output: default@parquet_source
+POSTHOOK: query: insert into table parquet_source select * from parquet_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+POSTHOOK: Output: default@parquet_source
+PREHOOK: query: insert into table parquet_part_source select * from
parquet_part_source where ds = 102 union all select * from orc_part_source
where ds = 103
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_part_source
+PREHOOK: Input: default@parquet_part_source
+PREHOOK: Output: default@parquet_part_source
+POSTHOOK: query: insert into table parquet_part_source select * from
parquet_part_source where ds = 102 union all select * from orc_part_source
where ds = 103
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_part_source
+POSTHOOK: Input: default@parquet_part_source
+POSTHOOK: Output: default@parquet_part_source
+PREHOOK: query: select count(*) from parquet_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(*) from parquet_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from parquet_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+5000
+PREHOOK: query: select count(distinct(file_path)) from
default.parquet_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.parquet_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_source
+#### A masked pattern was here ####
+3
+PREHOOK: query: select count(distinct(file_path)) from
default.parquet_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.parquet_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_part_source
+#### A masked pattern was here ####
+6
+PREHOOK: query: create table avro_part_source(key string, value string, ds
string) partitioned by spec (ds) stored by iceberg stored as avro
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@avro_part_source
+POSTHOOK: query: create table avro_part_source(key string, value string, ds
string) partitioned by spec (ds) stored by iceberg stored as avro
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@avro_part_source
+PREHOOK: query: create table avro_source(key string) stored by iceberg stored
as avro
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: create table avro_source(key string) stored by iceberg stored
as avro
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: insert overwrite table avro_part_source partition(ds='102')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=102
+POSTHOOK: query: insert overwrite table avro_part_source partition(ds='102')
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=102
+PREHOOK: query: insert into table avro_part_source partition(ds='102') select
* from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=102
+POSTHOOK: query: insert into table avro_part_source partition(ds='102') select
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=102
+PREHOOK: query: insert overwrite table avro_part_source partition(ds='103')
select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=103
+POSTHOOK: query: insert overwrite table avro_part_source partition(ds='103')
select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=103
+PREHOOK: query: insert into table avro_part_source partition(ds='103') select
* from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_part_source@ds=103
+POSTHOOK: query: insert into table avro_part_source partition(ds='103') select
* from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_part_source@ds=103
+PREHOOK: query: insert overwrite table avro_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: insert overwrite table avro_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: insert into table avro_source select key from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: insert into table avro_source select key from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: select count(*) from avro_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+1000
+PREHOOK: query: select count(*) from avro_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(distinct(file_path)) from
default.avro_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.avro_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+2
+PREHOOK: query: select count(distinct(file_path)) from
default.avro_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.avro_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+4
+PREHOOK: query: insert into table avro_source select * from avro_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+PREHOOK: Output: default@avro_source
+POSTHOOK: query: insert into table avro_source select * from avro_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+POSTHOOK: Output: default@avro_source
+PREHOOK: query: insert into table avro_part_source select * from
avro_part_source where ds = 102 union all select * from avro_part_source where
ds = 103
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+PREHOOK: Output: default@avro_part_source
+POSTHOOK: query: insert into table avro_part_source select * from
avro_part_source where ds = 102 union all select * from avro_part_source where
ds = 103
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+POSTHOOK: Output: default@avro_part_source
+PREHOOK: query: select count(*) from avro_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+2000
+PREHOOK: query: select count(*) from avro_part_source
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from avro_part_source
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+4000
+PREHOOK: query: select count(distinct(file_path)) from
default.avro_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.avro_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_source
+#### A masked pattern was here ####
+3
+PREHOOK: query: select count(distinct(file_path)) from
default.avro_part_source.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct(file_path)) from
default.avro_part_source.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@avro_part_source
+#### A masked pattern was here ####
+6
diff --git a/itests/src/test/resources/testconfiguration.properties
b/itests/src/test/resources/testconfiguration.properties
index 290f4e3acad..f682c6ef7d3 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -410,6 +410,7 @@ erasurecoding.only.query.files=\
erasure_simple.q
iceberg.llap.query.files=\
+ iceberg_merge_files.q,\
llap_iceberg_read_orc.q,\
llap_iceberg_read_parquet.q,\
vectorized_iceberg_read_mixed.q,\
@@ -425,6 +426,7 @@ iceberg.llap.query.compactor.files=\
iceberg_optimize_table_unpartitioned.q
iceberg.llap.only.query.files=\
+ iceberg_merge_files.q,\
llap_iceberg_read_orc.q,\
llap_iceberg_read_parquet.q
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index bd5284254f8..c1afa681ade 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -33,9 +33,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.hive.common.StringInternUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +56,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -65,6 +64,8 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import static jodd.util.ClassUtil.isAssignableFrom;
+
/**
* CombineHiveInputFormat is a parameterized InputFormat which looks at the
path
@@ -370,8 +371,12 @@ public class CombineHiveInputFormat<K extends
WritableComparable, V extends Writ
PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
pathToPartitionInfo, path,
IOPrepareCache.get().allocatePartitionDescMap());
TableDesc tableDesc = part.getTableDesc();
- if ((tableDesc != null) && tableDesc.isNonNative()) {
- return super.getSplits(job, numSplits);
+ if (tableDesc != null) {
+ boolean useDefaultFileFormat = part.getInputFileFormatClass()
+ .isAssignableFrom(tableDesc.getInputFileFormatClass());
+ if (tableDesc.isNonNative() && useDefaultFileFormat) {
+ return super.getSplits(job, numSplits);
+ }
}
// Use HiveInputFormat if any of the paths is not splittable
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index e9a0d139e90..a2c476c9ad5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.metadata;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.type.SnapshotContext;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.UpdateSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -747,4 +750,18 @@ public interface HiveStorageHandler extends Configurable {
throw new UnsupportedOperationException("Storage handler does not support
getting partitions by expression " +
"for a table.");
}
+
+ default boolean supportsMergeFiles() {
+ return false;
+ }
+
+ default List<FileStatus> getMergeTaskInputFiles(Properties properties)
throws IOException {
+ throw new UnsupportedOperationException("Storage handler does not support
getting merge input files " +
+ "for a table.");
+ }
+
+ default MergeTaskProperties getMergeTaskProperties(Properties properties) {
+ throw new UnsupportedOperationException("Storage handler does not support
getting merge input files " +
+ "for a table.");
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 525ecfbb13e..1eb256ceb86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -41,8 +41,11 @@ import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
+import org.apache.hadoop.hive.ql.ddl.view.create.CreateMaterializedViewDesc;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.DemuxOperator;
@@ -1720,6 +1723,41 @@ public final class GenMapRedUtils {
return newWork;
}
+ private static void
setStorageHandlerAndProperties(ConditionalResolverMergeFilesCtx mrCtx, MoveWork
work) {
+ Properties mergeTaskProperties = null;
+ String storageHandlerClass = null;
+ if (work.getLoadTableWork() != null) {
+ // Get the info from the table data
+ TableDesc tableDesc = work.getLoadTableWork().getTable();
+ storageHandlerClass = tableDesc.getProperties().getProperty(
+
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+ mergeTaskProperties = new Properties(tableDesc.getProperties());
+ } else {
+ // Get the info from the create table data
+ CreateTableDesc createTableDesc =
work.getLoadFileWork().getCtasCreateTableDesc();
+ String location = null;
+ if (createTableDesc != null) {
+ storageHandlerClass = createTableDesc.getStorageHandler();
+ mergeTaskProperties = new Properties();
+ mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_NAME,
createTableDesc.getDbTableName());
+ location = createTableDesc.getLocation();
+ } else {
+ CreateMaterializedViewDesc createViewDesc =
work.getLoadFileWork().getCreateViewDesc();
+ if (createViewDesc != null) {
+ storageHandlerClass = createViewDesc.getStorageHandler();
+ mergeTaskProperties = new Properties();
+ mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_NAME,
createViewDesc.getViewName());
+ location = createViewDesc.getLocation();
+ }
+ }
+ if (location != null) {
+ mergeTaskProperties.put(hive_metastoreConstants.META_TABLE_LOCATION,
location);
+ }
+ }
+ mrCtx.setTaskProperties(mergeTaskProperties);
+ mrCtx.setStorageHandlerClass(storageHandlerClass);
+ }
+
/**
* Construct a conditional task given the current leaf task, the MoveWork
and the MapredWork.
*
@@ -1800,6 +1838,9 @@ public final class GenMapRedUtils {
cndTsk.setResolver(new ConditionalResolverMergeFiles());
ConditionalResolverMergeFilesCtx mrCtx =
new ConditionalResolverMergeFilesCtx(listTasks,
condInputPath.toString());
+ if (moveTaskToLink != null) {
+ setStorageHandlerAndProperties(mrCtx, moveTaskToLink.getWork());
+ }
cndTsk.setResolverCtx(mrCtx);
// make the conditional task as the child of the current leaf task
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b15495a67dc..dd3f75fa665 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8159,8 +8159,19 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
boolean canBeMerged = (destinationTable == null ||
!((destinationTable.getNumBuckets() > 0) ||
(destinationTable.getSortCols() != null &&
destinationTable.getSortCols().size() > 0)));
- // If this table is working with ACID semantics, turn off merging
+ // If this table is working with ACID semantics or
+ // if its a delete, update, merge operation that supports merge task, turn
off merging
canBeMerged &= !destTableIsFullAcid;
+ if (destinationTable != null && destinationTable.getStorageHandler() !=
null) {
+ canBeMerged &= destinationTable.getStorageHandler().supportsMergeFiles();
+ // TODO: Support for merge task for update, delete and merge queries
+ // when storage handler supports it.
+ if (Context.Operation.UPDATE.equals(ctx.getOperation())
+ || Context.Operation.DELETE.equals(ctx.getOperation())
+ || Context.Operation.MERGE.equals(ctx.getOperation())) {
+ canBeMerged = false;
+ }
+ }
// Generate the partition columns from the parent input
if (destType == QBMetaData.DEST_TABLE || destType ==
QBMetaData.DEST_PARTITION) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index c5aecaa9cae..05e54798840 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
@@ -37,12 +38,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +75,8 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
private String dir;
private DynamicPartitionCtx dpCtx; // merge task could be after dynamic
partition insert
private ListBucketingCtx lbCtx;
+ private Properties properties;
+ private String storageHandlerClass;
public ConditionalResolverMergeFilesCtx() {
}
@@ -125,6 +133,22 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
public void setLbCtx(ListBucketingCtx lbCtx) {
this.lbCtx = lbCtx;
}
+
+ public void setTaskProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public Properties getTaskProperties() {
+ return properties;
+ }
+
+ public void setStorageHandlerClass(String className) {
+ this.storageHandlerClass = className;
+ }
+
+ public String getStorageHandlerClass() {
+ return storageHandlerClass;
+ }
}
public List<Task<?>> getTasks(HiveConf conf, Object objCtx) {
@@ -147,18 +171,21 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
Path dirPath = new Path(dirName);
FileSystem inpFs = dirPath.getFileSystem(conf);
DynamicPartitionCtx dpCtx = ctx.getDPCtx();
+ HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf,
ctx.getStorageHandlerClass());
+ boolean dirExists = inpFs.exists(dirPath);
+ boolean useCustomStorageHandler = storageHandler != null &&
storageHandler.supportsMergeFiles();
+
+ MapWork work = null;
+ // For each dynamic partition, check if it needs to be merged.
+ if (mrTask.getWork() instanceof MapredWork) {
+ work = ((MapredWork) mrTask.getWork()).getMapWork();
+ } else if (mrTask.getWork() instanceof TezWork){
+ work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0);
+ } else {
+ work = (MapWork) mrTask.getWork();
+ }
- if (inpFs.exists(dirPath)) {
- // For each dynamic partition, check if it needs to be merged.
- MapWork work;
- if (mrTask.getWork() instanceof MapredWork) {
- work = ((MapredWork) mrTask.getWork()).getMapWork();
- } else if (mrTask.getWork() instanceof TezWork){
- work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0);
- } else {
- work = (MapWork) mrTask.getWork();
- }
-
+ if (dirExists) {
int lbLevel = (ctx.getLbCtx() == null) ? 0 :
ctx.getLbCtx().calculateListBucketingLevel();
boolean manifestFilePresent = false;
FileSystem manifestFs = dirPath.getFileSystem(conf);
@@ -182,11 +209,11 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
int dpLbLevel = numDPCols + lbLevel;
generateActualTasks(conf, resTsks, trgtSize, avgConditionSize,
mvTask, mrTask,
- mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel,
manifestFilePresent);
+ mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel,
manifestFilePresent, storageHandler);
} else { // no dynamic partitions
if(lbLevel == 0) {
// static partition without list bucketing
- List<FileStatus> manifestFilePaths = new ArrayList<>();
+ List<FileStatus> manifestFilePaths = Lists.newArrayList();
long totalSize;
if (manifestFilePresent) {
manifestFilePaths = getManifestFilePaths(conf, dirPath);
@@ -208,15 +235,20 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
} else {
// static partition and list bucketing
generateActualTasks(conf, resTsks, trgtSize, avgConditionSize,
mvTask, mrTask,
- mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel,
manifestFilePresent);
+ mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel,
manifestFilePresent, storageHandler);
}
}
+ } else if (useCustomStorageHandler) {
+ generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask,
mrTask,
+ mrAndMvTask, dirPath, inpFs, ctx, work, 0, false,
storageHandler);
} else {
Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " +
dirPath);
resTsks.add(mvTask);
}
} catch (IOException e) {
LOG.warn("Exception while getting tasks", e);
+ } catch (ClassNotFoundException | HiveException e) {
+ throw new RuntimeException("Failed to load storage handler: {}" +
e.getMessage());
}
// Only one of the tasks should ever be added to resTsks
@@ -254,18 +286,26 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
long trgtSize, long avgConditionSize, Task<?> mvTask,
Task<?> mrTask, Task<?> mrAndMvTask, Path dirPath,
FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work,
int dpLbLevel,
- boolean manifestFilePresent)
- throws IOException {
+ boolean manifestFilePresent, HiveStorageHandler storageHandler)
+ throws IOException, ClassNotFoundException {
DynamicPartitionCtx dpCtx = ctx.getDPCtx();
List<FileStatus> statusList;
- Map<FileStatus, List<FileStatus>> manifestDirToFile = new HashMap<>();
+ Map<FileStatus, List<FileStatus>> parentDirToFile = new HashMap<>();
+ boolean useCustomStorageHandler = storageHandler != null &&
storageHandler.supportsMergeFiles();
+ MergeTaskProperties mergeProperties = useCustomStorageHandler ?
+ storageHandler.getMergeTaskProperties(ctx.getTaskProperties()) :
null;
if (manifestFilePresent) {
// Get the list of files from manifest file.
List<FileStatus> fileStatuses = getManifestFilePaths(conf, dirPath);
// Setup the work to include all the files present in the manifest.
setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false);
- manifestDirToFile = getManifestDirs(inpFs, fileStatuses);
- statusList = new ArrayList<>(manifestDirToFile.keySet());
+ parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+ statusList = Lists.newArrayList(parentDirToFile.keySet());
+ } else if (useCustomStorageHandler) {
+ List<FileStatus> fileStatuses =
storageHandler.getMergeTaskInputFiles(ctx.getTaskProperties());
+ setupWorkWithCustomHandler(work, dirPath, mergeProperties);
+ parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+ statusList = Lists.newArrayList(parentDirToFile.keySet());
} else {
statusList = HiveStatsUtils.getFileStatusRecurse(dirPath, dpLbLevel,
inpFs);
}
@@ -295,8 +335,8 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
List<Path> toMerge = new ArrayList<>();
for (int i = 0; i < status.length; ++i) {
long len;
- if (manifestFilePresent) {
- len = getMergeSize(manifestDirToFile.get(status[i]), avgConditionSize);
+ if (manifestFilePresent || useCustomStorageHandler) {
+ len = getMergeSize(parentDirToFile.get(status[i]), avgConditionSize);
} else {
len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize);
}
@@ -309,12 +349,15 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
Utilities.FILE_OP_LOGGER.warn("merger ignoring invalid DP path " +
status[i].getPath());
continue;
}
+ if (useCustomStorageHandler) {
+ updatePartDescProperties(pDesc, mergeProperties);
+ }
Utilities.FILE_OP_LOGGER.debug("merge resolver will merge " +
status[i].getPath());
work.resolveDynamicPartitionStoredAsSubDirsMerge(conf,
status[i].getPath(), tblDesc,
aliases, pDesc);
// Do not add input file since its already added when the manifest
file is present.
- if (manifestFilePresent) {
- toMerge.addAll(manifestDirToFile.get(status[i])
+ if (manifestFilePresent || useCustomStorageHandler) {
+ toMerge.addAll(parentDirToFile.get(status[i])
.stream().map(FileStatus::getPath).collect(Collectors.toList()));
} else {
toMerge.add(status[i].getPath());
@@ -512,7 +555,30 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
mapWork.setUseInputPathsDirectly(true);
}
- private Map<FileStatus, List<FileStatus>> getManifestDirs(FileSystem inpFs,
List<FileStatus> fileStatuses)
+ private void setupWorkWithCustomHandler(MapWork mapWork, Path dirPath,
+ MergeTaskProperties mergeProperties)
throws IOException, ClassNotFoundException {
+ Map<String, Operator<? extends OperatorDesc>> aliasToWork =
mapWork.getAliasToWork();
+ Map<Path, PartitionDesc> pathToPartitionInfo =
mapWork.getPathToPartitionInfo();
+ Operator<? extends OperatorDesc> op = aliasToWork.get(dirPath.toString());
+ PartitionDesc partitionDesc = pathToPartitionInfo.get(dirPath);
+ Path tmpDir = mergeProperties.getTmpLocation();
+ if (op != null) {
+ aliasToWork.remove(dirPath.toString());
+ aliasToWork.put(tmpDir.toString(), op);
+ mapWork.setAliasToWork(aliasToWork);
+ }
+ if (partitionDesc != null) {
+ updatePartDescProperties(partitionDesc, mergeProperties);
+ pathToPartitionInfo.remove(dirPath);
+ pathToPartitionInfo.put(tmpDir, partitionDesc);
+ mapWork.setPathToPartitionInfo(pathToPartitionInfo);
+ }
+ mapWork.removePathToAlias(dirPath);
+ mapWork.addPathToAlias(tmpDir, tmpDir.toString());
+ mapWork.setUseInputPathsDirectly(true);
+ }
+
+ private Map<FileStatus, List<FileStatus>> getParentDirToFileMap(FileSystem
inpFs, List<FileStatus> fileStatuses)
throws IOException {
Map<FileStatus, List<FileStatus>> manifestDirsToPaths = new HashMap<>();
for (FileStatus fileStatus : fileStatuses) {
@@ -527,4 +593,22 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
}
return manifestDirsToPaths;
}
+
+ private void updatePartDescProperties(PartitionDesc partitionDesc,
+ MergeTaskProperties mergeProperties)
throws IOException, ClassNotFoundException {
+ if (mergeProperties != null) {
+ String inputFileFormatClassName =
mergeProperties.getStorageFormatDescriptor().getInputFormat();
+ String outputFileFormatClassName =
mergeProperties.getStorageFormatDescriptor().getOutputFormat();
+ String serdeClassName =
mergeProperties.getStorageFormatDescriptor().getSerde();
+ if (inputFileFormatClassName != null) {
+
partitionDesc.setInputFileFormatClass(JavaUtils.loadClass(inputFileFormatClassName));
+ }
+ if (outputFileFormatClassName != null) {
+
partitionDesc.setOutputFileFormatClass(JavaUtils.loadClass(outputFileFormatClassName));
+ }
+ if (serdeClassName != null) {
+
partitionDesc.getTableDesc().getProperties().setProperty(serdeConstants.SERIALIZATION_LIB,
serdeClassName);
+ }
+ }
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
new file mode 100644
index 00000000000..6083b1b117f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+
+import java.io.IOException;
+
+public interface MergeTaskProperties {
+ public Path getTmpLocation();
+
+ public StorageFormatDescriptor getStorageFormatDescriptor() throws
IOException;
+}