This is an automated email from the ASF dual-hosted git repository.
sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 023c8b5ed54 HIVE-28258: Use Iceberg semantics for Merge task (#5251)
(Sourabh Badhya reviewed by Krisztian Kasa, Denys Kuzmenko)
023c8b5ed54 is described below
commit 023c8b5ed54c49f166103428ab0b7d247df826c3
Author: Sourabh Badhya <[email protected]>
AuthorDate: Wed Jun 19 11:26:07 2024 +0530
HIVE-28258: Use Iceberg semantics for Merge task (#5251) (Sourabh Badhya
reviewed by Krisztian Kasa, Denys Kuzmenko)
---
.../org/apache/iceberg/mr/hive/FilesForCommit.java | 16 +-
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 12 +-
.../mr/hive/HiveIcebergOutputCommitter.java | 177 +++++++++--
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 43 +--
.../mr/hive/IcebergMergeTaskProperties.java | 36 ++-
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 1 +
.../mapred/AbstractMapredIcebergRecordReader.java | 4 +-
.../mr/mapred/MapredIcebergInputFormat.java | 14 +-
.../mr/mapreduce/AbstractIcebergRecordReader.java | 156 +++++++++
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 353 +--------------------
.../mr/mapreduce/IcebergMergeRecordReader.java | 169 ++++++++++
.../{IcebergSplit.java => IcebergMergeSplit.java} | 63 ++--
.../iceberg/mr/mapreduce/IcebergRecordReader.java | 308 ++++++++++++++++++
.../apache/iceberg/mr/mapreduce/IcebergSplit.java | 2 +-
.../mr/mapreduce/IcebergSplitContainer.java | 4 +-
.../hadoop/hive/ql/io/CombineHiveInputFormat.java | 18 +-
.../hadoop/hive/ql/io/CombineHiveRecordReader.java | 31 +-
.../ql/plan/ConditionalResolverMergeFiles.java | 23 +-
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 22 +-
.../hadoop/hive/ql/plan/MergeTaskProperties.java | 8 +-
20 files changed, 930 insertions(+), 530 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 2e25f5a8c2e..1ca3872b424 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -35,27 +36,31 @@ public class FilesForCommit implements Serializable {
private final Collection<DeleteFile> deleteFiles;
private final Collection<DataFile> replacedDataFiles;
private final Collection<CharSequence> referencedDataFiles;
+ private final Collection<Path> mergedAndDeletedFiles;
public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile>
deleteFiles) {
this(dataFiles, deleteFiles, Collections.emptyList());
}
public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile>
deleteFiles,
- Collection<DataFile> replacedDataFiles, Collection<CharSequence>
referencedDataFiles) {
+ Collection<DataFile> replacedDataFiles, Collection<CharSequence>
referencedDataFiles,
+ Collection<Path> mergedAndDeletedFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.replacedDataFiles = replacedDataFiles;
this.referencedDataFiles = referencedDataFiles;
+ this.mergedAndDeletedFiles = mergedAndDeletedFiles;
}
public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile>
deleteFiles,
Collection<DataFile> replacedDataFiles) {
- this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
+ this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet(),
Collections.emptySet());
}
public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
Collection<CharSequence> referencedDataFiles) {
- return new FilesForCommit(Collections.emptyList(), deleteFiles,
Collections.emptyList(), referencedDataFiles);
+ return new FilesForCommit(Collections.emptyList(), deleteFiles,
Collections.emptyList(),
+ referencedDataFiles, Collections.emptySet());
}
public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
@@ -86,6 +91,10 @@ public class FilesForCommit implements Serializable {
return referencedDataFiles;
}
+ public Collection<Path> mergedAndDeletedFiles() {
+ return mergedAndDeletedFiles;
+ }
+
public Collection<? extends ContentFile> allFiles() {
return Stream.concat(dataFiles.stream(),
deleteFiles.stream()).collect(Collectors.toList());
}
@@ -101,6 +110,7 @@ public class FilesForCommit implements Serializable {
.add("deleteFiles", deleteFiles.toString())
.add("replacedDataFiles", replacedDataFiles.toString())
.add("referencedDataFiles", referencedDataFiles.toString())
+ .add("mergedAndDeletedFiles", mergedAndDeletedFiles.toString())
.toString();
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 1ea78eeba54..98fd4b30163 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -56,6 +58,7 @@ import
org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -64,7 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
- implements CombineHiveInputFormat.AvoidSplitCombination,
VectorizedInputFormatInterface,
+ implements CombineHiveInputFormat.MergeSplits,
VectorizedInputFormatInterface,
LlapCacheOnlyInputFormatInterface.VectorizedOnly {
private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergInputFormat.class);
@@ -225,4 +228,11 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
String dbAndTableName = TableName.fromString(tableName, null,
null).getNotEmptyDbTable();
return ICEBERG_DISABLE_VECTORIZATION_PREFIX + dbAndTableName;
}
+
+ @Override
+ public FileSplit createMergeSplit(Configuration conf,
+
CombineHiveInputFormat.CombineHiveInputSplit split,
+ Integer partition, Properties properties)
throws IOException {
+ return new IcebergMergeSplit(conf, split, partition, properties);
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index d9f3116ff84..cb61d545b3d 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -52,12 +52,14 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
@@ -83,6 +85,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
@@ -97,6 +100,11 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
private static final String FOR_COMMIT_EXTENSION = ".forCommit";
private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+ private static final HiveIcebergOutputCommitter OUTPUT_COMMITTER = new
HiveIcebergOutputCommitter();
+
+ public static HiveIcebergOutputCommitter getInstance() {
+ return OUTPUT_COMMITTER;
+ }
@Override
public void setupJob(JobContext jobContext) {
@@ -126,6 +134,7 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
+ Set<Path> mergedPaths = getCombinedLocations(jobConf);
Set<String> outputs =
HiveIcebergStorageHandler.outputTables(context.getJobConf());
Map<String, List<HiveIcebergWriter>> writers =
Optional.ofNullable(WriterRegistry.writers(attemptID))
.orElseGet(() -> {
@@ -158,7 +167,8 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
}
- createFileForCommit(new FilesForCommit(dataFiles, deleteFiles,
replacedDataFiles, referencedDataFiles),
+ createFileForCommit(new FilesForCommit(dataFiles, deleteFiles,
replacedDataFiles, referencedDataFiles,
+ mergedPaths),
fileForCommitLocation, table.io());
} else {
LOG.info("CommitTask found no writer for specific table: {},
attemptID: {}", output, attemptID);
@@ -170,8 +180,6 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
LOG.info("CommitTask found no serialized table in config for
table: {}.", output);
}
}, IOException.class);
-
- cleanMergeTaskInputFiles(jobConf, tableExecutor, context);
} finally {
if (tableExecutor != null) {
tableExecutor.shutdown();
@@ -281,6 +289,9 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
.collect(Collectors.toList()));
commitTable(table.io(), fileExecutor, output, operation);
});
+
+ // Cleanup any merge input files.
+ cleanMergeTaskInputFiles(jobContextList, tableExecutor);
} finally {
fileExecutor.shutdown();
if (tableExecutor != null) {
@@ -423,6 +434,7 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> replacedDataFiles = Lists.newArrayList();
Set<CharSequence> referencedDataFiles = Sets.newHashSet();
+ Set<Path> mergedAndDeletedFiles = Sets.newHashSet();
Table table = null;
String branchName = null;
@@ -459,9 +471,14 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
deleteFiles.addAll(writeResults.deleteFiles());
replacedDataFiles.addAll(writeResults.replacedDataFiles());
referencedDataFiles.addAll(writeResults.referencedDataFiles());
+ mergedAndDeletedFiles.addAll(writeResults.mergedAndDeletedFiles());
}
- FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles,
replacedDataFiles, referencedDataFiles);
+ dataFiles.removeIf(dataFile -> mergedAndDeletedFiles.contains(new
Path(String.valueOf(dataFile.path()))));
+ deleteFiles.removeIf(deleteFile -> mergedAndDeletedFiles.contains(new
Path(String.valueOf(deleteFile.path()))));
+
+ FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles,
replacedDataFiles, referencedDataFiles,
+ Collections.emptySet());
long startTime = System.currentTimeMillis();
if (Operation.IOW != operation) {
@@ -687,6 +704,7 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>();
Collection<CharSequence> referencedDataFiles = new
ConcurrentLinkedQueue<>();
+ Collection<Path> mergedAndDeletedFiles = new ConcurrentLinkedQueue<>();
Tasks.range(numTasks)
.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
@@ -698,9 +716,10 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
deleteFiles.addAll(files.deleteFiles());
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
+ mergedAndDeletedFiles.addAll(files.mergedAndDeletedFiles());
});
- return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles,
referencedDataFiles);
+ return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles,
referencedDataFiles, mergedAndDeletedFiles);
}
/**
@@ -747,11 +766,18 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
}
}
+ /**
+ * Generates a list of file statuses of the output files in the jobContexts.
+ * @param jobContexts List of jobContexts
+ * @return Returns the list of file statuses of the output files in the
jobContexts
+ * @throws IOException Throws IOException
+ */
public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws
IOException {
List<OutputTable> outputs = collectOutputs(jobContexts);
ExecutorService fileExecutor =
fileExecutor(jobContexts.get(0).getJobConf());
ExecutorService tableExecutor =
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
- Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
+ Map<Path, List<FileStatus>> parentDirToDataFile = Maps.newConcurrentMap();
+ Map<Path, List<FileStatus>> parentDirToDeleteFile =
Maps.newConcurrentMap();
try {
Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
.map(jobContext -> new SimpleImmutableEntry<>(kv.table,
jobContext))))
@@ -773,8 +799,15 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
FilesForCommit results = collectResults(numTasks,
fileExecutor, table.location(), jobContext,
table.io(), false);
for (DataFile dataFile : results.dataFiles()) {
- FileStatus fileStatus = fileSystem.getFileStatus(new
Path(dataFile.path().toString()));
- dataFiles.add(fileStatus);
+ Path filePath = new Path(dataFile.path().toString());
+ FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+ parentDirToDataFile.computeIfAbsent(filePath.getParent(), k
-> Lists.newArrayList()).add(fileStatus);
+ }
+ for (DeleteFile deleteFile : results.deleteFiles()) {
+ Path filePath = new Path(deleteFile.path().toString());
+ FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+ parentDirToDeleteFile.computeIfAbsent(filePath.getParent(),
+ k -> Lists.newArrayList()).add(fileStatus);
}
}, IOException.class);
} finally {
@@ -783,28 +816,126 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
tableExecutor.shutdown();
}
}
- return Lists.newArrayList(dataFiles);
+ List<FileStatus> dataFiles = Lists.newArrayList();
+ dataFiles.addAll(parentDirToDataFile.values().stream()
+ .flatMap(List::stream).collect(Collectors.toList()));
+ dataFiles.addAll(parentDirToDeleteFile.values().stream()
+ .flatMap(List::stream).collect(Collectors.toList()));
+ return dataFiles;
}
- private void cleanMergeTaskInputFiles(JobConf jobConf,
- ExecutorService tableExecutor,
- TaskAttemptContext context) throws
IOException {
+ /**
+ * Generates a list of ContentFile objects of the output files in the
jobContexts.
+ * @param jobContexts List of jobContexts
+ * @return Returns the list of file statuses of the output files in the
jobContexts
+ * @throws IOException Throws IOException
+ */
+ public List<ContentFile> getOutputContentFiles(List<JobContext> jobContexts)
throws IOException {
+ List<OutputTable> outputs = collectOutputs(jobContexts);
+ ExecutorService fileExecutor =
fileExecutor(jobContexts.get(0).getJobConf());
+ ExecutorService tableExecutor =
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+ Collection<ContentFile> files = new ConcurrentLinkedQueue<>();
+ try {
+ Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+ .map(jobContext -> new SimpleImmutableEntry<>(kv.table,
jobContext))))
+ .suppressFailureWhenFinished()
+ .executeWith(tableExecutor)
+ .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge
input file for the table {}", output, exc))
+ .run(output -> {
+ JobContext jobContext = output.getValue();
+ JobConf jobConf = jobContext.getJobConf();
+ LOG.info("Cleaning job for jobID: {}, table: {}",
jobContext.getJobID(), output);
+
+ Table table = output.getKey();
+ FileSystem fileSystem = new
Path(table.location()).getFileSystem(jobConf);
+ String jobLocation = generateJobLocation(table.location(),
jobConf, jobContext.getJobID());
+ // list jobLocation to get number of forCommit files
+ // we do this because map/reduce num in jobConf is unreliable
+ // and we have no access to vertex status info
+ int numTasks = listForCommits(jobConf, jobLocation).size();
+ FilesForCommit results = collectResults(numTasks,
fileExecutor, table.location(), jobContext,
+ table.io(), false);
+ files.addAll(results.dataFiles());
+ files.addAll(results.deleteFiles());
+ }, IOException.class);
+ } finally {
+ fileExecutor.shutdown();
+ if (tableExecutor != null) {
+ tableExecutor.shutdown();
+ }
+ }
+ return Lists.newArrayList(files);
+ }
+
+ private void cleanMergeTaskInputFiles(List<JobContext> jobContexts,
+ ExecutorService tableExecutor) throws
IOException {
// Merge task has merged several files into one. Hence we need to remove
the stale files.
// At this stage the file is written and task-committed, but the old files
are still present.
+ for (JobContext jobContext : jobContexts) {
+ JobConf jobConf = jobContext.getJobConf();
+ if
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
{
+ MapWork mrwork = Utilities.getMapWork(jobConf);
+ if (mrwork != null) {
+ List<Path> mergedPaths = mrwork.getInputPaths();
+ if (mergedPaths != null) {
+ Tasks.foreach(mergedPaths)
+ .retry(3)
+ .executeWith(tableExecutor)
+ .run(path -> {
+ FileSystem fs = path.getFileSystem(jobConf);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }, IOException.class);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Generates {@link JobContext}s for the OutputCommitter for the specific
table.
+ * @param configuration The configuration used for as a base of the JobConf
+ * @param tableName The name of the table we are planning to commit
+ * @param branchName the name of the branch
+ * @return The generated Optional JobContext list or empty if not presents.
+ */
+ static List<JobContext> generateJobContext(Configuration configuration,
String tableName,
+ String branchName) {
+ JobConf jobConf = new JobConf(configuration);
+ Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
+ SessionStateUtil.getCommitInfo(jobConf, tableName);
+ if (commitInfoMap.isPresent()) {
+ List<JobContext> jobContextList = Lists.newLinkedList();
+ for (SessionStateUtil.CommitInfo commitInfo :
commitInfoMap.get().values()) {
+ org.apache.hadoop.mapred.JobID jobID =
org.apache.hadoop.mapred.JobID.forName(commitInfo.getJobIdStr());
+ commitInfo.getProps().forEach(jobConf::set);
+
+ // we should only commit this current table because
+ // for multi-table inserts, this hook method will be called
sequentially for each target table
+ jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+ if (branchName != null) {
+ jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName);
+ }
+
+ jobContextList.add(new JobContextImpl(jobConf, jobID, null));
+ }
+ return jobContextList;
+ } else {
+ // most likely empty write scenario
+ LOG.debug("Unable to find commit information in query state for table:
{}", tableName);
+ return Collections.emptyList();
+ }
+ }
+
+ private Set<Path> getCombinedLocations(JobConf jobConf) {
+ Set<Path> mergedPaths = Sets.newHashSet();
if
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
{
MapWork mrwork = Utilities.getMapWork(jobConf);
- if (mrwork != null) {
- List<Path> mergedPaths = mrwork.getInputPaths();
- if (mergedPaths != null) {
- Tasks.foreach(mergedPaths)
- .retry(3)
- .executeWith(tableExecutor)
- .run(path -> {
- FileSystem fs = path.getFileSystem(context.getJobConf());
- fs.delete(path, true);
- }, IOException.class);
- }
+ if (mrwork != null && mrwork.getInputPaths() != null) {
+ mergedPaths.addAll(mrwork.getInputPaths());
}
}
+ return mergedPaths;
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 3157fbb0411..7ccb4bfb3bf 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -132,8 +132,6 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
@@ -763,7 +761,8 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
if (location != null) {
HiveTableUtil.cleanupTableObjectFile(location, configuration);
}
- List<JobContext> jobContextList = generateJobContext(configuration,
tableName, snapshotRef);
+ List<JobContext> jobContextList = HiveIcebergOutputCommitter
+ .generateJobContext(configuration, tableName, snapshotRef);
if (jobContextList.isEmpty()) {
return;
}
@@ -1569,41 +1568,6 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
return true;
}
- /**
- * Generates {@link JobContext}s for the OutputCommitter for the specific
table.
- * @param configuration The configuration used for as a base of the JobConf
- * @param tableName The name of the table we are planning to commit
- * @param branchName the name of the branch
- * @return The generated Optional JobContext list or empty if not presents.
- */
- private List<JobContext> generateJobContext(Configuration configuration,
String tableName,
- String branchName) {
- JobConf jobConf = new JobConf(configuration);
- Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
- SessionStateUtil.getCommitInfo(jobConf, tableName);
- if (commitInfoMap.isPresent()) {
- List<JobContext> jobContextList = Lists.newLinkedList();
- for (SessionStateUtil.CommitInfo commitInfo :
commitInfoMap.get().values()) {
- JobID jobID = JobID.forName(commitInfo.getJobIdStr());
- commitInfo.getProps().forEach(jobConf::set);
-
- // we should only commit this current table because
- // for multi-table inserts, this hook method will be called
sequentially for each target table
- jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
- if (branchName != null) {
- jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName);
- }
-
- jobContextList.add(new JobContextImpl(jobConf, jobID, null));
- }
- return jobContextList;
- } else {
- // most likely empty write scenario
- LOG.debug("Unable to find commit information in query state for table:
{}", tableName);
- return Collections.emptyList();
- }
- }
-
private String getOperationType() {
return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName())
.orElse(Operation.OTHER.name());
@@ -2172,7 +2136,8 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
String tableName = properties.getProperty(Catalogs.NAME);
String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
Configuration configuration = SessionState.getSessionConf();
- List<JobContext> originalContextList = generateJobContext(configuration,
tableName, snapshotRef);
+ List<JobContext> originalContextList = HiveIcebergOutputCommitter
+ .generateJobContext(configuration, tableName, snapshotRef);
List<JobContext> jobContextList = originalContextList.stream()
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
index ff47a801a5b..e3e6df6ceba 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
@@ -19,19 +19,20 @@
package org.apache.iceberg.mr.hive;
import java.io.IOException;
+import java.util.List;
import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
-import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.TableProperties;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.mr.Catalogs;
public class IcebergMergeTaskProperties implements MergeTaskProperties {
private final Properties properties;
- private static final StorageFormatFactory storageFormatFactory = new
StorageFormatFactory();
IcebergMergeTaskProperties(Properties properties) {
this.properties = properties;
@@ -42,14 +43,23 @@ public class IcebergMergeTaskProperties implements
MergeTaskProperties {
return new Path(location + "/data/");
}
- public StorageFormatDescriptor getStorageFormatDescriptor() throws
IOException {
- FileFormat fileFormat =
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,
- TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
- StorageFormatDescriptor descriptor =
storageFormatFactory.get(fileFormat.name());
- if (descriptor == null) {
- throw new IOException("Unsupported storage format descriptor");
+ @Override
+ public Properties getSplitProperties() throws IOException {
+ String tableName = properties.getProperty(Catalogs.NAME);
+ String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+ Configuration configuration = SessionState.getSessionConf();
+ List<JobContext> originalContextList = HiveIcebergOutputCommitter
+ .generateJobContext(configuration, tableName, snapshotRef);
+ List<JobContext> jobContextList = originalContextList.stream()
+ .map(TezUtil::enrichContextWithVertexId)
+ .collect(Collectors.toList());
+ if (jobContextList.isEmpty()) {
+ return null;
}
- return descriptor;
+ List<ContentFile> contentFiles =
HiveIcebergOutputCommitter.getInstance().getOutputContentFiles(jobContextList);
+ Properties pathToContentFile = new Properties();
+ contentFiles.forEach(contentFile ->
+ pathToContentFile.put(new Path(String.valueOf(contentFile.path())),
contentFile));
+ return pathToContentFile;
}
-
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 79454f45ee8..653372d7988 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -344,4 +344,5 @@ public class IcebergTableUtil {
}
return data;
}
+
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
index 92e9c4a688d..2e892900d02 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.iceberg.mr.mapreduce.IcebergSplit;
@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class AbstractMapredIcebergRecordReader<T> implements
RecordReader<Void, T> {
@@ -32,7 +32,7 @@ public abstract class AbstractMapredIcebergRecordReader<T>
implements RecordRead
protected final org.apache.hadoop.mapreduce.RecordReader<Void, ?>
innerReader;
public
AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<?>
mapreduceInputFormat,
- IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
TaskAttemptContext context =
MapredIcebergInputFormat.newTaskAttemptContext(job, reporter);
try {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
index 822ce52a7e4..473e8fe1486 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
/**
@@ -75,8 +75,13 @@ public class MapredIcebergInputFormat<T> implements
InputFormat<Void, Container<
@Override
public RecordReader<Void, Container<T>> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter)
throws IOException {
- IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
- return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit,
job, reporter);
+ try {
+ org.apache.hadoop.mapreduce.InputSplit inputSplit = split instanceof
IcebergMergeSplit ?
+ (IcebergMergeSplit) split : ((IcebergSplitContainer)
split).icebergSplit();
+ return new MapredIcebergRecordReader<>(innerInputFormat, inputSplit,
job, reporter);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
@@ -85,7 +90,8 @@ public class MapredIcebergInputFormat<T> implements
InputFormat<Void, Container<
private final long splitLength; // for getPos()
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T>
mapreduceInputFormat,
- IcebergSplit split, JobConf job, Reporter
reporter) throws IOException {
+ org.apache.hadoop.mapreduce.InputSplit split, JobConf job, Reporter
reporter)
+ throws IOException, InterruptedException {
super(mapreduceInputFormat, split, job, reporter);
splitLength = split.getLength();
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
new file mode 100644
index 00000000000..53ee0d870f8
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
+
+public abstract class AbstractIcebergRecordReader<T> extends
RecordReader<Void, T> {
+
+ private TaskAttemptContext context;
+ private Configuration conf;
+ private Table table;
+ private Schema expectedSchema;
+ private String nameMapping;
+ private boolean reuseContainers;
+ private boolean caseSensitive;
+ private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
+ private boolean fetchVirtualColumns;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext newContext) {
+ // For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
+ this.context = newContext;
+ this.conf = newContext.getConfiguration();
+ this.table = HiveIcebergStorageHandler.table(conf,
conf.get(InputFormatConfig.TABLE_IDENTIFIER));
+ HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
+ this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+ this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
+ this.expectedSchema = readSchema(conf, table, caseSensitive);
+ this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS,
false);
+ this.inMemoryDataModel =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
+ InputFormatConfig.InMemoryDataModel.GENERIC);
+ this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+ }
+
+ private static Schema readSchema(Configuration conf, Table table, boolean
caseSensitive) {
+ Schema readSchema = InputFormatConfig.readSchema(conf);
+
+ if (readSchema != null) {
+ return readSchema;
+ }
+
+ String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
+ readSchema = table.schema();
+
+ if (selectedColumns != null) {
+ readSchema =
+ caseSensitive ? readSchema.select(selectedColumns) :
readSchema.caseInsensitiveSelect(selectedColumns);
+ }
+
+ if (InputFormatConfig.fetchVirtualColumns(conf)) {
+ return
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(),
table);
+ }
+
+ return readSchema;
+ }
+
+ CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter,
Expression residual,
+ Schema readSchema) {
+ boolean applyResidual = !getContext().getConfiguration()
+ .getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
+
+ if (applyResidual && residual != null && residual !=
Expressions.alwaysTrue()) {
+ // Date and timestamp values are not the correct type for Evaluator.
+ // Wrapping to return the expected type.
+ InternalRecordWrapper wrapper = new
InternalRecordWrapper(readSchema.asStruct());
+ Evaluator filter = new Evaluator(readSchema.asStruct(), residual,
caseSensitive);
+ return CloseableIterable.filter(iter, record ->
filter.eval(wrapper.wrap((StructLike) record)));
+ } else {
+ return iter;
+ }
+ }
+
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public boolean isReuseContainers() {
+ return reuseContainers;
+ }
+
+ public Schema getExpectedSchema() {
+ return expectedSchema;
+ }
+
+ public String getNameMapping() {
+ return nameMapping;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public InputFormatConfig.InMemoryDataModel getInMemoryDataModel() {
+ return inMemoryDataModel;
+ }
+
+ public boolean isFetchVirtualColumns() {
+ return fetchVirtualColumns;
+ }
+
+ public boolean isCaseSensitive() {
+ return caseSensitive;
+ }
+
+ @Override
+ public Void getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ // TODO: We could give a more accurate progress based on records read from
the file. Context.getProgress does not
+ // have enough information to give an accurate progress value. This isn't
that easy, since we don't know how much
+ // of the input split has been processed and we are pushing filters into
Parquet and ORC. But we do know when a
+ // file is opened and could count the number of rows returned, so we can
estimate. And we could also add a row
+ // count to the readers so that we can get an accurate count of rows that
have been either returned or filtered
+ // out.
+ return getContext().getProgress();
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index efb3988688d..00e63740e28 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -21,23 +21,15 @@ package org.apache.iceberg.mr.mapreduce;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -49,54 +41,23 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTableScan;
-import org.apache.iceberg.DataTask;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
-import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.data.GenericDeleteFilter;
-import org.apache.iceberg.data.IdentityPartitionConverters;
-import org.apache.iceberg.data.InternalRecordWrapper;
-import org.apache.iceberg.data.avro.DataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.encryption.EncryptedFiles;
-import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
-import org.apache.iceberg.mr.hive.IcebergAcidUtil;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.ThreadPools;
@@ -293,319 +254,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split,
TaskAttemptContext context) {
- return new IcebergRecordReader<>();
+ return split instanceof IcebergMergeSplit ? new
IcebergMergeRecordReader<>() : new IcebergRecordReader<>();
}
- private static final class IcebergRecordReader<T> extends RecordReader<Void,
T> {
-
- private static final String HIVE_VECTORIZED_READER_CLASS =
"org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
- private static final DynMethods.StaticMethod
HIVE_VECTORIZED_READER_BUILDER;
-
- static {
- if (HiveVersion.min(HiveVersion.HIVE_3)) {
- HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
- .impl(HIVE_VECTORIZED_READER_CLASS,
- Table.class,
- Path.class,
- FileScanTask.class,
- Map.class,
- TaskAttemptContext.class,
- Expression.class,
- Schema.class)
- .buildStatic();
- } else {
- HIVE_VECTORIZED_READER_BUILDER = null;
- }
- }
-
- private TaskAttemptContext context;
- private Configuration conf;
- private Schema expectedSchema;
- private String nameMapping;
- private boolean reuseContainers;
- private boolean caseSensitive;
- private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
- private Iterator<FileScanTask> tasks;
- private T current;
- private CloseableIterator<T> currentIterator;
- private Table table;
- private boolean fetchVirtualColumns;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext newContext) {
- // For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
- CombinedScanTask task = ((IcebergSplit) split).task();
- this.context = newContext;
- this.conf = newContext.getConfiguration();
- this.table = SerializationUtil.deserializeFromBase64(
- conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX +
conf.get(InputFormatConfig.TABLE_IDENTIFIER)));
- HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
- this.tasks = task.files().iterator();
- this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
- this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
- this.expectedSchema = readSchema(conf, table, caseSensitive);
- this.reuseContainers =
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
- this.inMemoryDataModel =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
- InputFormatConfig.InMemoryDataModel.GENERIC);
- this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
- this.currentIterator = nextTask();
- }
-
- private CloseableIterator<T> nextTask() {
- CloseableIterator<T> closeableIterator = open(tasks.next(),
expectedSchema).iterator();
- if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) {
- return closeableIterator;
- }
- return new
IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator,
expectedSchema, conf);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException {
- while (true) {
- if (currentIterator.hasNext()) {
- current = currentIterator.next();
- return true;
- } else if (tasks.hasNext()) {
- currentIterator.close();
- this.currentIterator = nextTask();
- } else {
- currentIterator.close();
- return false;
- }
- }
- }
-
- @Override
- public Void getCurrentKey() {
- return null;
- }
-
- @Override
- public T getCurrentValue() {
- return current;
- }
-
- @Override
- public float getProgress() {
- // TODO: We could give a more accurate progress based on records read
from the file. Context.getProgress does not
- // have enough information to give an accurate progress value. This
isn't that easy, since we don't know how much
- // of the input split has been processed and we are pushing filters into
Parquet and ORC. But we do know when a
- // file is opened and could count the number of rows returned, so we can
estimate. And we could also add a row
- // count to the readers so that we can get an accurate count of rows
that have been either returned or filtered
- // out.
- return context.getProgress();
- }
-
- @Override
- public void close() throws IOException {
- currentIterator.close();
- }
-
- private CloseableIterable<T> openVectorized(FileScanTask task, Schema
readSchema) {
-
Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
- "Vectorized execution is not yet supported for Iceberg avro tables.
" +
- "Please turn off vectorization and retry the query.");
- Preconditions.checkArgument(HiveVersion.min(HiveVersion.HIVE_3),
- "Vectorized read is unsupported for Hive 2 integration.");
-
- Path path = new Path(task.file().path().toString());
- Map<Integer, ?> idToConstant = constantsMap(task,
HiveIdentityPartitionConverters::convertConstant);
- Expression residual = HiveIcebergInputFormat.residualForTask(task,
context.getConfiguration());
-
- // TODO: We have to take care of the EncryptionManager when LLAP and
vectorization is used
- CloseableIterable<T> iterator =
HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task,
- idToConstant, context, residual, readSchema);
-
- return applyResidualFiltering(iterator, residual, readSchema);
- }
-
- private CloseableIterable<T> openGeneric(FileScanTask task, Schema
readSchema) {
- if (task.isDataTask()) {
- // When querying metadata tables, the currentTask is a DataTask and
the data has to
- // be fetched from the task instead of reading it from files.
- IcebergInternalRecordWrapper wrapper =
- new IcebergInternalRecordWrapper(table.schema().asStruct(),
readSchema.asStruct());
- return (CloseableIterable) CloseableIterable.transform(((DataTask)
task).rows(), row -> wrapper.wrap(row));
- }
-
- DataFile file = task.file();
- InputFile inputFile =
table.encryption().decrypt(EncryptedFiles.encryptedInput(
- table.io().newInputFile(file.path().toString()),
- file.keyMetadata()));
-
- CloseableIterable<T> iterable;
- switch (file.format()) {
- case AVRO:
- iterable = newAvroIterable(inputFile, task, readSchema);
- break;
- case ORC:
- iterable = newOrcIterable(inputFile, task, readSchema);
- break;
- case PARQUET:
- iterable = newParquetIterable(inputFile, task, readSchema);
- break;
- default:
- throw new UnsupportedOperationException(
- String.format("Cannot read %s file: %s", file.format().name(),
file.path()));
- }
-
- return iterable;
- }
-
- @SuppressWarnings("unchecked")
- private CloseableIterable<T> open(FileScanTask currentTask, Schema
readSchema) {
- switch (inMemoryDataModel) {
- case PIG:
- // TODO: Support Pig and Hive object models for IcebergInputFormat
- throw new UnsupportedOperationException("Pig and Hive object models
are not supported.");
- case HIVE:
- return openVectorized(currentTask, readSchema);
- case GENERIC:
- DeleteFilter deletes = new GenericDeleteFilter(table.io(),
currentTask, table.schema(), readSchema);
- Schema requiredSchema = deletes.requiredSchema();
- return deletes.filter(openGeneric(currentTask, requiredSchema));
- default:
- throw new UnsupportedOperationException("Unsupported memory model");
- }
- }
-
- private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T>
iter, Expression residual,
- Schema readSchema) {
- boolean applyResidual =
!context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING,
false);
-
- if (applyResidual && residual != null && residual !=
Expressions.alwaysTrue()) {
- // Date and timestamp values are not the correct type for Evaluator.
- // Wrapping to return the expected type.
- InternalRecordWrapper wrapper = new
InternalRecordWrapper(readSchema.asStruct());
- Evaluator filter = new Evaluator(readSchema.asStruct(), residual,
caseSensitive);
- return CloseableIterable.filter(iter, record ->
filter.eval(wrapper.wrap((StructLike) record)));
- } else {
- return iter;
- }
- }
-
- private CloseableIterable<T> newAvroIterable(
- InputFile inputFile, FileScanTask task, Schema readSchema) {
- Expression residual = HiveIcebergInputFormat.residualForTask(task,
context.getConfiguration());
- Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
- .project(readSchema)
- .split(task.start(), task.length());
-
- if (reuseContainers) {
- avroReadBuilder.reuseContainers();
- }
-
- if (nameMapping != null) {
-
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- avroReadBuilder.createReaderFunc(
- (expIcebergSchema, expAvroSchema) ->
- DataReader.create(expIcebergSchema, expAvroSchema,
- constantsMap(task,
IdentityPartitionConverters::convertConstant)));
-
- return applyResidualFiltering(avroReadBuilder.build(), residual,
readSchema);
- }
-
- private CloseableIterable<T> newParquetIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
- Expression residual = HiveIcebergInputFormat.residualForTask(task,
context.getConfiguration());
-
- Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
- .project(readSchema)
- .filter(residual)
- .caseSensitive(caseSensitive)
- .split(task.start(), task.length());
-
- if (reuseContainers) {
- parquetReadBuilder.reuseContainers();
- }
-
- if (nameMapping != null) {
-
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- parquetReadBuilder.createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(
- readSchema, fileSchema, constantsMap(task,
IdentityPartitionConverters::convertConstant)));
-
- return applyResidualFiltering(parquetReadBuilder.build(), residual,
readSchema);
- }
-
- private CloseableIterable<T> newOrcIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
- Map<Integer, ?> idToConstant = constantsMap(task,
IdentityPartitionConverters::convertConstant);
- Schema readSchemaWithoutConstantAndMetadataFields =
schemaWithoutConstantsAndMeta(readSchema, idToConstant);
- Expression residual = HiveIcebergInputFormat.residualForTask(task,
context.getConfiguration());
-
- ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
- .project(readSchemaWithoutConstantAndMetadataFields)
- .filter(residual)
- .caseSensitive(caseSensitive)
- .split(task.start(), task.length());
-
- if (nameMapping != null) {
-
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- orcReadBuilder.createReaderFunc(
- fileSchema -> GenericOrcReader.buildReader(
- readSchema, fileSchema, idToConstant));
-
- return applyResidualFiltering(orcReadBuilder.build(), residual,
readSchema);
- }
-
- private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type,
Object, Object> converter) {
- PartitionSpec spec = task.spec();
- Set<Integer> idColumns = spec.identitySourceIds();
- Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
- boolean projectsIdentityPartitionColumns =
!partitionSchema.columns().isEmpty();
- if (expectedSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) !=
null) {
- Types.StructType partitionType = Partitioning.partitionType(table);
- return PartitionUtil.constantsMap(task, partitionType, converter);
- } else if (projectsIdentityPartitionColumns) {
- Types.StructType partitionType = Partitioning.partitionType(table);
- return PartitionUtil.constantsMap(task, partitionType, converter);
- } else {
- return Collections.emptyMap();
- }
- }
-
- private static Schema readSchema(Configuration conf, Table table, boolean
caseSensitive) {
- Schema readSchema = InputFormatConfig.readSchema(conf);
-
- if (readSchema != null) {
- return readSchema;
- }
-
- String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
- readSchema = table.schema();
-
- if (selectedColumns != null) {
- readSchema =
- caseSensitive ? readSchema.select(selectedColumns) :
readSchema.caseInsensitiveSelect(selectedColumns);
- }
-
- if (InputFormatConfig.fetchVirtualColumns(conf)) {
- return
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(),
table);
- }
-
- return readSchema;
- }
-
- private static Schema schemaWithoutConstantsAndMeta(Schema readSchema,
Map<Integer, ?> idToConstant) {
- // remove the nested fields of the partition struct
- Set<Integer> partitionFields =
Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID))
- .map(Types.NestedField::type)
- .map(Type::asStructType)
- .map(Types.StructType::fields)
- .map(fields ->
fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet()))
- .orElseGet(Collections::emptySet);
-
- // remove constants and meta columns too
- Set<Integer> collect = Stream.of(idToConstant.keySet(),
MetadataColumns.metadataFieldIds(), partitionFields)
- .flatMap(Set::stream)
- .collect(Collectors.toSet());
-
- return TypeUtil.selectNot(readSchema, collect);
- }
- }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
new file mode 100644
index 00000000000..60995a1988d
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public final class IcebergMergeRecordReader<T> extends
AbstractIcebergRecordReader<T> {
+ private IcebergMergeSplit mergeSplit;
+ private CloseableIterator<T> currentIterator;
+ private T current;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext newContext) {
+ // For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
+ super.initialize(split, newContext);
+ mergeSplit = (IcebergMergeSplit) split;
+ this.currentIterator = nextTask();
+ }
+
+ private CloseableIterator<T> nextTask() {
+ return openGeneric(mergeSplit.getContentFile(),
getTable().schema()).iterator();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ current = currentIterator.next();
+ return true;
+ } else {
+ currentIterator.close();
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public T getCurrentValue() {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentIterator.close();
+ }
+
+ private CloseableIterable<T> openGeneric(ContentFile contentFile, Schema
readSchema) {
+ InputFile inputFile = null;
+ Schema schema = null;
+ if (contentFile instanceof DataFile) {
+ DataFile dataFile = (DataFile) contentFile;
+ inputFile =
getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
+ getTable().io().newInputFile(dataFile.path().toString()),
+ dataFile.keyMetadata()));
+ schema = readSchema;
+ }
+ CloseableIterable<T> iterable;
+ switch (contentFile.format()) {
+ case AVRO:
+ iterable = newAvroIterable(inputFile, schema);
+ break;
+ case ORC:
+ iterable = newOrcIterable(inputFile, schema);
+ break;
+ case PARQUET:
+ iterable = newParquetIterable(inputFile, schema);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Cannot read %s file: %s",
contentFile.format().name(), contentFile.path()));
+ }
+
+ return iterable;
+ }
+
+ private CloseableIterable<T> newAvroIterable(
+ InputFile inputFile, Schema readSchema) {
+ Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
+ .project(readSchema)
+ .split(mergeSplit.getStart(), mergeSplit.getLength());
+
+ if (isReuseContainers()) {
+ avroReadBuilder.reuseContainers();
+ }
+
+ if (getNameMapping() != null) {
+
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+ }
+
+ avroReadBuilder.createReaderFunc(
+ (expIcebergSchema, expAvroSchema) ->
+ DataReader.create(expIcebergSchema, expAvroSchema,
Maps.newHashMap()));
+
+ return applyResidualFiltering(avroReadBuilder.build(), null, readSchema);
+ }
+
+ private CloseableIterable<T> newOrcIterable(InputFile inputFile, Schema
readSchema) {
+ ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+ .project(readSchema)
+ .caseSensitive(isCaseSensitive())
+ .split(mergeSplit.getStart(), mergeSplit.getLength());
+
+ if (getNameMapping() != null) {
+
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+ }
+
+ orcReadBuilder.createReaderFunc(
+ fileSchema -> GenericOrcReader.buildReader(
+ readSchema, fileSchema, Maps.newHashMap()));
+
+ return applyResidualFiltering(orcReadBuilder.build(), null, readSchema);
+ }
+
+ private CloseableIterable<T> newParquetIterable(InputFile inputFile, Schema
readSchema) {
+
+ Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+ .project(readSchema)
+ .caseSensitive(isCaseSensitive())
+ .split(mergeSplit.getStart(), mergeSplit.getLength());
+
+ if (isReuseContainers()) {
+ parquetReadBuilder.reuseContainers();
+ }
+
+ if (getNameMapping() != null) {
+
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+ }
+
+ parquetReadBuilder.createReaderFunc(
+ fileSchema -> GenericParquetReaders.buildReader(
+ readSchema, fileSchema, Maps.newHashMap()));
+
+ return applyResidualFiltering(parquetReadBuilder.build(), null,
readSchema);
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java
similarity index 50%
copy from
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
copy to
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java
index 420bf9ea3b1..64a4b6737be 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java
@@ -22,65 +22,46 @@ package org.apache.iceberg.mr.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.util.SerializationUtil;
-// Since this class extends `mapreduce.InputSplit and implements
`mapred.InputSplit`, it can be returned by both MR v1
-// and v2 file formats.
-public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer {
-
- public static final String[] ANYWHERE = new String[]{"*"};
-
- private CombinedScanTask task;
-
- private transient String[] locations;
+public class IcebergMergeSplit extends FileSplit implements
org.apache.hadoop.mapred.InputSplit {
private transient Configuration conf;
+ private ContentFile contentFile;
// public no-argument constructor for deserialization
- public IcebergSplit() {
+ public IcebergMergeSplit() {
}
- IcebergSplit(Configuration conf, CombinedScanTask task) {
- this.task = task;
+ public IcebergMergeSplit(Configuration conf,
+ CombineHiveInputFormat.CombineHiveInputSplit split,
+ Integer partition, Properties properties) throws
IOException {
+ super(split.getPaths()[partition], split
+ .getStartOffsets()[partition], split.getLengths()[partition], split
+ .getLocations());
this.conf = conf;
- }
-
- public CombinedScanTask task() {
- return task;
- }
-
- @Override
- public IcebergSplit icebergSplit() {
- return this;
+ Path path = split.getPaths()[partition];
+ contentFile = (ContentFile) properties.get(path);
}
@Override
public long getLength() {
- return task.files().stream().mapToLong(FileScanTask::length).sum();
+ return contentFile.fileSizeInBytes();
}
@Override
public String[] getLocations() {
- // The implementation of getLocations() is only meant to be used during
split computation
- // getLocations() won't be accurate when called on worker nodes and will
always return "*"
- if (locations == null && conf != null) {
- boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY,
false);
- locations = localityPreferred ? Util.blockLocations(task, conf) :
ANYWHERE;
- } else {
- locations = ANYWHERE;
- }
-
- return locations;
+ return new String[]{"*"};
}
@Override
public void write(DataOutput out) throws IOException {
- byte[] data = SerializationUtil.serializeToBytes(this.task);
+ byte[] data = SerializationUtil.serializeToBytes(this.contentFile);
out.writeInt(data.length);
out.write(data);
}
@@ -89,6 +70,10 @@ public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred
public void readFields(DataInput in) throws IOException {
byte[] data = new byte[in.readInt()];
in.readFully(data);
- this.task = SerializationUtil.deserializeFromBytes(data);
+ this.contentFile = SerializationUtil.deserializeFromBytes(data);
+ }
+
+ public ContentFile getContentFile() {
+ return contentFile;
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
new file mode 100644
index 00000000000..0c5b719aaa9
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.GenericDeleteFilter;
+import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hive.HiveVersion;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+
+public final class IcebergRecordReader<T> extends
AbstractIcebergRecordReader<T> {
+
+ private static final String HIVE_VECTORIZED_READER_CLASS =
"org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
+ private static final DynMethods.StaticMethod HIVE_VECTORIZED_READER_BUILDER;
+
+ static {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
+ HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
+ .impl(HIVE_VECTORIZED_READER_CLASS,
+ Table.class,
+ Path.class,
+ FileScanTask.class,
+ Map.class,
+ TaskAttemptContext.class,
+ Expression.class,
+ Schema.class)
+ .buildStatic();
+ } else {
+ HIVE_VECTORIZED_READER_BUILDER = null;
+ }
+ }
+
+ private Iterator<FileScanTask> tasks;
+ private CloseableIterator<T> currentIterator;
+ private T current;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext newContext) {
+ // For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
+ super.initialize(split, newContext);
+ CombinedScanTask task = ((IcebergSplit) split).task();
+ this.tasks = task.files().iterator();
+ this.currentIterator = nextTask();
+ }
+
+ private CloseableIterator<T> nextTask() {
+ CloseableIterator<T> closeableIterator = open(tasks.next(),
getExpectedSchema()).iterator();
+ if (!isFetchVirtualColumns() || Utilities.getIsVectorized(getConf())) {
+ return closeableIterator;
+ }
+ return new
IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator,
getExpectedSchema(), getConf());
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ current = currentIterator.next();
+ return true;
+ } else if (tasks.hasNext()) {
+ currentIterator.close();
+ this.currentIterator = nextTask();
+ } else {
+ currentIterator.close();
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public T getCurrentValue() {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentIterator.close();
+ }
+
+ private CloseableIterable<T> openVectorized(FileScanTask task, Schema
readSchema) {
+ Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
+ "Vectorized execution is not yet supported for Iceberg avro tables. " +
+ "Please turn off vectorization and retry the query.");
+ Preconditions.checkArgument(HiveVersion.min(HiveVersion.HIVE_3),
+ "Vectorized read is unsupported for Hive 2 integration.");
+
+ Path path = new Path(task.file().path().toString());
+ Map<Integer, ?> idToConstant = constantsMap(task,
HiveIdentityPartitionConverters::convertConstant);
+ Expression residual = HiveIcebergInputFormat.residualForTask(task,
getContext().getConfiguration());
+
+ // TODO: We have to take care of the EncryptionManager when LLAP and
vectorization is used
+ CloseableIterable<T> iterator =
HIVE_VECTORIZED_READER_BUILDER.invoke(getTable(), path, task,
+ idToConstant, getContext(), residual, readSchema);
+
+ return applyResidualFiltering(iterator, residual, readSchema);
+ }
+
+ private CloseableIterable<T> openGeneric(FileScanTask task, Schema
readSchema) {
+ if (task.isDataTask()) {
+ // When querying metadata tables, the currentTask is a DataTask and the
data has to
+ // be fetched from the task instead of reading it from files.
+ IcebergInternalRecordWrapper wrapper =
+ new IcebergInternalRecordWrapper(getTable().schema().asStruct(),
readSchema.asStruct());
+ return (CloseableIterable) CloseableIterable.transform(((DataTask)
task).rows(), row -> wrapper.wrap(row));
+ }
+
+ DataFile file = task.file();
+ InputFile inputFile =
getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
+ getTable().io().newInputFile(file.path().toString()),
+ file.keyMetadata()));
+
+ CloseableIterable<T> iterable;
+ switch (file.format()) {
+ case AVRO:
+ iterable = newAvroIterable(inputFile, task, readSchema);
+ break;
+ case ORC:
+ iterable = newOrcIterable(inputFile, task, readSchema);
+ break;
+ case PARQUET:
+ iterable = newParquetIterable(inputFile, task, readSchema);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Cannot read %s file: %s", file.format().name(),
file.path()));
+ }
+
+ return iterable;
+ }
+
+ @SuppressWarnings("unchecked")
+ private CloseableIterable<T> open(FileScanTask currentTask, Schema
readSchema) {
+ switch (getInMemoryDataModel()) {
+ case PIG:
+ // TODO: Support Pig and Hive object models for IcebergInputFormat
+ throw new UnsupportedOperationException("Pig and Hive object models
are not supported.");
+ case HIVE:
+ return openVectorized(currentTask, readSchema);
+ case GENERIC:
+ DeleteFilter deletes = new GenericDeleteFilter(getTable().io(),
currentTask, getTable().schema(), readSchema);
+ Schema requiredSchema = deletes.requiredSchema();
+ return deletes.filter(openGeneric(currentTask, requiredSchema));
+ default:
+ throw new UnsupportedOperationException("Unsupported memory model");
+ }
+ }
+
+ private CloseableIterable<T> newAvroIterable(
+ InputFile inputFile, FileScanTask task, Schema readSchema) {
+ Expression residual = HiveIcebergInputFormat.residualForTask(task,
getContext().getConfiguration());
+ Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
+ .project(readSchema)
+ .split(task.start(), task.length());
+
+ if (isReuseContainers()) {
+ avroReadBuilder.reuseContainers();
+ }
+
+ if (getNameMapping() != null) {
+
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+ }
+
+ avroReadBuilder.createReaderFunc(
+ (expIcebergSchema, expAvroSchema) ->
+ DataReader.create(expIcebergSchema, expAvroSchema,
+ constantsMap(task,
IdentityPartitionConverters::convertConstant)));
+
+ return applyResidualFiltering(avroReadBuilder.build(), residual,
readSchema);
+ }
+
+ private CloseableIterable<T> newParquetIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
+ Expression residual = HiveIcebergInputFormat.residualForTask(task,
getContext().getConfiguration());
+
+ Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+ .project(readSchema)
+ .filter(residual)
+ .caseSensitive(isCaseSensitive())
+ .split(task.start(), task.length());
+
+ if (isReuseContainers()) {
+ parquetReadBuilder.reuseContainers();
+ }
+
+ if (getNameMapping() != null) {
+
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+ }
+
+ parquetReadBuilder.createReaderFunc(
+ fileSchema -> GenericParquetReaders.buildReader(
+ readSchema, fileSchema, constantsMap(task,
IdentityPartitionConverters::convertConstant)));
+
+ return applyResidualFiltering(parquetReadBuilder.build(), residual,
readSchema);
+ }
+
+ private CloseableIterable<T> newOrcIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
+ Map<Integer, ?> idToConstant = constantsMap(task,
IdentityPartitionConverters::convertConstant);
+ Schema readSchemaWithoutConstantAndMetadataFields =
schemaWithoutConstantsAndMeta(readSchema, idToConstant);
+ Expression residual = HiveIcebergInputFormat.residualForTask(task,
getContext().getConfiguration());
+
+ ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+ .project(readSchemaWithoutConstantAndMetadataFields)
+ .filter(residual)
+ .caseSensitive(isCaseSensitive())
+ .split(task.start(), task.length());
+
+ if (getNameMapping() != null) {
+
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+ }
+
+ orcReadBuilder.createReaderFunc(
+ fileSchema -> GenericOrcReader.buildReader(
+ readSchema, fileSchema, idToConstant));
+
+ return applyResidualFiltering(orcReadBuilder.build(), residual,
readSchema);
+ }
+
+ private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type,
Object, Object> converter) {
+ PartitionSpec spec = task.spec();
+ Set<Integer> idColumns = spec.identitySourceIds();
+ Schema partitionSchema = TypeUtil.select(getExpectedSchema(), idColumns);
+ boolean projectsIdentityPartitionColumns =
!partitionSchema.columns().isEmpty();
+ if (getExpectedSchema().findField(MetadataColumns.PARTITION_COLUMN_ID) !=
null) {
+ Types.StructType partitionType = Partitioning.partitionType(getTable());
+ return PartitionUtil.constantsMap(task, partitionType, converter);
+ } else if (projectsIdentityPartitionColumns) {
+ Types.StructType partitionType = Partitioning.partitionType(getTable());
+ return PartitionUtil.constantsMap(task, partitionType, converter);
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ private static Schema schemaWithoutConstantsAndMeta(Schema readSchema,
Map<Integer, ?> idToConstant) {
+ // remove the nested fields of the partition struct
+ Set<Integer> partitionFields =
Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID))
+ .map(Types.NestedField::type)
+ .map(Type::asStructType)
+ .map(Types.StructType::fields)
+ .map(fields ->
fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet()))
+ .orElseGet(Collections::emptySet);
+
+ // remove constants and meta columns too
+ Set<Integer> collect = Stream.of(idToConstant.keySet(),
MetadataColumns.metadataFieldIds(), partitionFields)
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+
+ return TypeUtil.selectNot(readSchema, collect);
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 420bf9ea3b1..395ea35fe47 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -32,7 +32,7 @@ import org.apache.iceberg.util.SerializationUtil;
// Since this class extends `mapreduce.InputSplit and implements
`mapred.InputSplit`, it can be returned by both MR v1
// and v2 file formats.
-public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer {
+public class IcebergSplit extends InputSplit implements IcebergSplitContainer {
public static final String[] ANYWHERE = new String[]{"*"};
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
index c77543763b1..b989444957c 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
@@ -19,7 +19,9 @@
package org.apache.iceberg.mr.mapreduce;
-public interface IcebergSplitContainer {
+import org.apache.hadoop.mapred.InputSplit;
+
+public interface IcebergSplitContainer extends InputSplit {
IcebergSplit icebergSplit();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index d057bf4f460..e5f407ee15c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -368,18 +370,14 @@ public class CombineHiveInputFormat<K extends
WritableComparable, V extends Writ
PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
pathToPartitionInfo, path,
IOPrepareCache.get().allocatePartitionDescMap());
TableDesc tableDesc = part.getTableDesc();
- if (tableDesc != null) {
- boolean useDefaultFileFormat = part.getInputFileFormatClass()
- .isAssignableFrom(tableDesc.getInputFileFormatClass());
- if (tableDesc.isNonNative() && useDefaultFileFormat) {
- return super.getSplits(job, numSplits);
- }
- }
-
// Use HiveInputFormat if any of the paths is not splittable
Class inputFormatClass = part.getInputFileFormatClass();
String inputFormatClassName = inputFormatClass.getName();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ if (tableDesc != null && tableDesc.isNonNative() &&
mrwork.getMergeSplitProperties() == null) {
+ return super.getSplits(job, numSplits);
+ }
+
String deserializerClassName = null;
try {
deserializerClassName = part.getDeserializer(job).getClass().getName();
@@ -765,4 +763,8 @@ public class CombineHiveInputFormat<K extends
WritableComparable, V extends Writ
public interface AvoidSplitCombination {
boolean shouldSkipCombine(Path path, Configuration conf) throws
IOException;
}
+
+ public interface MergeSplits extends AvoidSplitCombination {
+ FileSplit createMergeSplit(Configuration conf, CombineHiveInputSplit
split, Integer partition, Properties splitProperties) throws IOException;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
index 7a687864cea..c0118d5e3d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
@@ -70,14 +70,12 @@ public class CombineHiveRecordReader<K extends
WritableComparable, V extends Wri
+ inputFormatClassName);
}
InputFormat inputFormat =
HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
+ MapWork mrwork = null;
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED,
LlapProxy.isDaemon())) {
try {
// TODO : refactor this out
if (pathToPartInfo == null) {
- MapWork mrwork = (MapWork) Utilities.getMergeWork(jobConf);
- if (mrwork == null) {
- mrwork = Utilities.getMapWork(jobConf);
- }
+ mrwork = getMrWork(jobConf);
pathToPartInfo = mrwork.getPathToPartitionInfo();
}
@@ -88,14 +86,21 @@ public class CombineHiveRecordReader<K extends
WritableComparable, V extends Wri
}
}
+ FileSplit inputSplit;
// create a split for the given partition
- FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
- .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit
- .getLocations());
+ if (inputFormat instanceof CombineHiveInputFormat.MergeSplits) {
+ mrwork = getMrWork(jobConf);
+ inputSplit = ((CombineHiveInputFormat.MergeSplits)
inputFormat).createMergeSplit(jobConf, hsplit, partition,
+ mrwork.getMergeSplitProperties());
+ } else {
+ inputSplit = new FileSplit(hsplit.getPaths()[partition], hsplit
+ .getStartOffsets()[partition], hsplit.getLengths()[partition],
hsplit
+ .getLocations());
+ }
- this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf,
reporter));
+ this.setRecordReader(inputFormat.getRecordReader(inputSplit, jobConf,
reporter));
- this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader);
+ this.initIOContext(inputSplit, jobConf, inputFormatClass,
this.recordReader);
//If current split is from the same file as preceding split and the
preceding split has footerbuffer,
//the current split should use the preceding split's footerbuffer in order
to skip footer correctly.
@@ -127,6 +132,14 @@ public class CombineHiveRecordReader<K extends
WritableComparable, V extends Wri
return part;
}
+ private MapWork getMrWork(JobConf jobConf) {
+ MapWork mrwork = (MapWork) Utilities.getMergeWork(jobConf);
+ if (mrwork == null) {
+ mrwork = Utilities.getMapWork(jobConf);
+ }
+ return mrwork;
+ }
+
@Override
public void doClose() throws IOException {
recordReader.close();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index 05e54798840..4e6d4c3bcfe 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -349,9 +349,6 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
Utilities.FILE_OP_LOGGER.warn("merger ignoring invalid DP path " +
status[i].getPath());
continue;
}
- if (useCustomStorageHandler) {
- updatePartDescProperties(pDesc, mergeProperties);
- }
Utilities.FILE_OP_LOGGER.debug("merge resolver will merge " +
status[i].getPath());
work.resolveDynamicPartitionStoredAsSubDirsMerge(conf,
status[i].getPath(), tblDesc,
aliases, pDesc);
@@ -568,11 +565,11 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
mapWork.setAliasToWork(aliasToWork);
}
if (partitionDesc != null) {
- updatePartDescProperties(partitionDesc, mergeProperties);
pathToPartitionInfo.remove(dirPath);
pathToPartitionInfo.put(tmpDir, partitionDesc);
mapWork.setPathToPartitionInfo(pathToPartitionInfo);
}
+ mapWork.setMergeSplitProperties(mergeProperties.getSplitProperties());
mapWork.removePathToAlias(dirPath);
mapWork.addPathToAlias(tmpDir, tmpDir.toString());
mapWork.setUseInputPathsDirectly(true);
@@ -593,22 +590,4 @@ public class ConditionalResolverMergeFiles implements
ConditionalResolver,
}
return manifestDirsToPaths;
}
-
- private void updatePartDescProperties(PartitionDesc partitionDesc,
- MergeTaskProperties mergeProperties)
throws IOException, ClassNotFoundException {
- if (mergeProperties != null) {
- String inputFileFormatClassName =
mergeProperties.getStorageFormatDescriptor().getInputFormat();
- String outputFileFormatClassName =
mergeProperties.getStorageFormatDescriptor().getOutputFormat();
- String serdeClassName =
mergeProperties.getStorageFormatDescriptor().getSerde();
- if (inputFileFormatClassName != null) {
-
partitionDesc.setInputFileFormatClass(JavaUtils.loadClass(inputFileFormatClassName));
- }
- if (outputFileFormatClassName != null) {
-
partitionDesc.setOutputFileFormatClass(JavaUtils.loadClass(outputFileFormatClassName));
- }
- if (serdeClassName != null) {
-
partitionDesc.getTableDesc().getProperties().setProperty(serdeConstants.SERIALIZATION_LIB,
serdeClassName);
- }
- }
- }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 076ef0a99b7..e705af31dea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -22,18 +22,8 @@ import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.ql.exec.TableScanOperator.ProbeDecodeContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -184,6 +174,8 @@ public class MapWork extends BaseWork {
private boolean useInputPathsDirectly;
+ private Properties mergeSplitProperties;
+
public MapWork() {}
public MapWork(String name) {
@@ -954,4 +946,12 @@ public class MapWork extends BaseWork {
public boolean isUseInputPathsDirectly() {
return useInputPathsDirectly;
}
+
+ public Properties getMergeSplitProperties() {
+ return mergeSplitProperties;
+ }
+
+ public void setMergeSplitProperties(Properties mergeSplitProperties) {
+ this.mergeSplitProperties = mergeSplitProperties;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
index 6083b1b117f..fdf8451d273 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
@@ -19,12 +19,16 @@
package org.apache.hadoop.hive.ql.plan;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
public interface MergeTaskProperties {
public Path getTmpLocation();
- public StorageFormatDescriptor getStorageFormatDescriptor() throws
IOException;
+ default Properties getSplitProperties() throws IOException {
+ return null;
+ }
}