RussellSpitzer commented on a change in pull request #3876:
URL: https://github.com/apache/iceberg/pull/3876#discussion_r787918497
##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +74,151 @@ private TableMigrationUtil() {
public static List<DataFile> listPartition(Map<String, String> partition,
String uri, String format,
PartitionSpec spec, Configuration
conf, MetricsConfig metricsConfig,
NameMapping mapping) {
+ int parallelism = conf.getInt(PARQUET_READ_PARALLELISM, 1);
if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
+ return listAvroPartition(partition, uri, spec, conf, parallelism);
} else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig,
mapping);
+ return listParquetPartition(partition, uri, spec, conf, metricsConfig,
mapping, parallelism);
} else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig,
mapping);
+ return listOrcPartition(partition, uri, spec, conf, metricsConfig,
mapping, parallelism);
} else {
throw new UnsupportedOperationException("Unknown partition format: " +
format);
}
}
private static List<DataFile> listAvroPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf) {
+ PartitionSpec spec,
Configuration conf, int parallelism) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- InputFile file =
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
- long rowCount = Avro.rowCount(file);
- Metrics metrics = new Metrics(rowCount, null, null, null, null);
- String partitionKey = spec.fields().stream()
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(Executors.newFixedThreadPool(parallelism));
+ }
+ task.run(index -> {
+ FileStatus stat = fileStatus.get(index);
+ InputFile file =
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
+ long rowCount = Avro.rowCount(file);
+ Metrics metrics = new Metrics(rowCount, null, null, null, null);
+ String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
.collect(Collectors.joining("/"));
- return DataFiles.builder(spec)
+ datafiles[index] = DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat("avro")
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();
+ });
+
+ return Arrays.asList(datafiles);
- }).collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
}
+
}
private static List<DataFile> listParquetPartition(Map<String, String>
partitionPath, String partitionUri,
PartitionSpec spec,
Configuration conf,
- MetricsConfig
metricsSpec, NameMapping mapping) {
+ MetricsConfig
metricsSpec, NameMapping mapping, int parallelism) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics;
- try {
- ParquetMetadata metadata = ParquetFileReader.readFooter(conf,
stat);
- metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(),
metricsSpec, mapping);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read the footer of the
parquet file: " +
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(Executors.newFixedThreadPool(parallelism));
+ }
+ task.run(index -> {
+ FileStatus stat = fileStatus.get(index);
+ Metrics metrics;
+ try {
+ InputFile file = HadoopInputFile.fromPath(stat.getPath(), conf);
+ metrics = ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+ } catch (RuntimeIOException e) {
+ throw new RuntimeException("Unable to read the footer of the parquet
file: " +
stat.getPath(), e);
- }
- String partitionKey = spec.fields().stream()
+ }
+ String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
.collect(Collectors.joining("/"));
- return DataFiles.builder(spec)
+ datafiles[index] = DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat("parquet")
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();
- }).collect(Collectors.toList());
+ });
+
+ return Arrays.asList(datafiles);
+
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
}
+
}
private static List<DataFile> listOrcPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig metricsSpec,
NameMapping mapping) {
+ PartitionSpec spec,
Configuration conf,
+ MetricsConfig
metricsSpec, NameMapping mapping, int parallelism) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(Executors.newFixedThreadPool(parallelism));
+ }
+ task.run(index -> {
+ FileStatus stat = fileStatus.get(index);
+ Metrics metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
metricsSpec, mapping);
- String partitionKey = spec.fields().stream()
+ String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
.collect(Collectors.joining("/"));
- return DataFiles.builder(spec)
+ datafiles[index] = DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat("orc")
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();
+ });
+
+ return Arrays.asList(datafiles);
- }).collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
}
+
Review comment:
new - newline
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]