wgtmac commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1524117636
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+ List<Queue<TransParquetFileReader>> inputFilesR =
options.getParquetInputFilesR()
+ .stream()
+ .map(x -> getFileReaders(x, conf))
+ .collect(Collectors.toList());
+ ensureSameSchema(inputFiles);
+ inputFilesR.forEach(this::ensureSameSchema);
Review Comment:
So this imposes strict schema check here. Is it possible to loose the
restriction to allow the files on the right side to have different schemas but
in total all columns are available? For example: a group of right side files
with schema below:
- `a:bigint,b:string`, `c:double,d:binary`
- `a:bigint`, `b:string`, `c:double`, `d:binary`
- `a:bigint,b:string,c:double,d:binary`
I know this would be complex, just asking and not sure if it is an valid
real world use case.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +160,31 @@ public ParquetRewriter(RewriteOptions options) throws
IOException {
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
+ LOG.warn("Input column name {} doesn't show up in the schema", col);
}
}
Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
schema = pruneColumnsInSchema(schema, prunePaths);
}
- this.descriptorsMap =
- schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ if (inputFilesR.isEmpty()) { // TODO: find a more suitable solution
+ this.descriptorsMap =
+ schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ } else {
+ this.descriptorsMap = schemaL.getColumns().stream()
Review Comment:
OK, now I can answer my own question, the complex of loose schema
enforcement on the right side may complicate `descriptorsMap` here.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -813,7 +841,7 @@ private Type extractField(GroupType candidate, Type
targetField) {
} else {
Type tempField = extractField(field.asGroupType(), targetField);
if (tempField != null) {
- return tempField;
+ return new GroupType(candidate.getRepetition(), candidate.getName(),
tempField);
Review Comment:
Is it an individual bug fix?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -39,11 +36,20 @@
/**
* A set of options to create a ParquetRewriter.
+ *
+ * TODO find a place where to put a proper description of functionality as it
is not trivial:
+ * ParquetRewriter allows to stitch files with the same schema into a single
file.
+ * Note that ParquetRewriter also can be used for effectively stitch/joining
multiple parquet files with
+ * different schemas.
+ * You can provide the main input file group and multiple right side ones.
That is possible when:
+ * 1) the number of rows in the main and extra input groups are the same,
+ * 2) the ordering of rows in the main and extra input groups is the same.
*/
public class RewriteOptions {
private final ParquetConfiguration conf;
private final List<InputFile> inputFiles;
+ private final List<List<InputFile>> inputFilesR;
Review Comment:
Could we consolidate the internal member variables `inputFiles` and
`inputFilesR`. My rough idea is to model these files into `HorizontalInputFile`
and `VerticalInputFile`.
- VerticalInputFile: An `InputFile` of a single parquet file which may have
partial or full schema.
- HorizontalInputFile: one or more `VerticalInputFile`s with same number
rows but different schemas to form a logical wide file. It can choose one
`VerticalInputFile` to be the left side and the others to be the right side in
your join algorithm. If it contains only one `VerticalInputFile`, this is what
we have today.
WDYT?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -18,10 +18,7 @@
*/
package org.apache.parquet.hadoop.rewrite;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
Review Comment:
Please avoid wildcard import.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+ List<Queue<TransParquetFileReader>> inputFilesR =
options.getParquetInputFilesR()
+ .stream()
+ .map(x -> getFileReaders(x, conf))
+ .collect(Collectors.toList());
+ ensureSameSchema(inputFiles);
+ inputFilesR.forEach(this::ensureSameSchema);
LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
- // Init reader of the first input file
- initNextReader();
+ extraMetaData.put(
+ ORIGINAL_CREATED_BY_KEY,
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+ .collect(Collectors.toSet())
+ .stream()
+ .reduce((a, b) -> a + "\n" + b)
+ .orElse("")
+ );
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .forEach(x ->
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+ // TODO check that schema on the left and on the right is not identical
Review Comment:
nit: the schema check now is complex so it worths a separate method.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -110,6 +118,27 @@ public List<Path> getInputFiles() {
.collect(Collectors.toList());
}
+ /** TODO fix documentation after addition of inputFilesR
+ * Gets the right input {@link Path}s for the rewrite if they exist for all
input files,
+ * otherwise throws a {@link RuntimeException}.
+ *
+ * @return a {@link List} of the associated right input {@link Path}s
+ */
+ public List<List<Path>> getInputFilesR() {
Review Comment:
I'd suggest removing this method as it exposes hadoop dependency. We might
need to stick to `InputFile` as much as possible.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+ List<Queue<TransParquetFileReader>> inputFilesR =
options.getParquetInputFilesR()
+ .stream()
+ .map(x -> getFileReaders(x, conf))
+ .collect(Collectors.toList());
+ ensureSameSchema(inputFiles);
+ inputFilesR.forEach(this::ensureSameSchema);
LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
- // Init reader of the first input file
- initNextReader();
+ extraMetaData.put(
Review Comment:
I'm skeptical of the importance of metadata from the right side files.
However I don't have a better suggestion here.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +932,208 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private void processBlocksFromReaderWithStitching(IndexCache indexCache)
throws IOException {
+ // TODO add the test for empty files joins, it should merge schemas
+ LOG.info("Rewriting input fileLeft: {}, remaining filesLeft: {}",
reader.getFile(), inputFiles.size());
+ int rowGroupIdx = 0;
+ List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData: blocks) {
+ writer.startBlock(blockMetaData.getRowCount());
+
+ // Writing the left side
+ indexCache.setBlockMetadata(blockMetaData);
+ List<ColumnChunkMetaData> chunksL = blockMetaData.getColumns();
+ for (ColumnChunkMetaData chunk : chunksL) {
+ if (chunk.isEncrypted()) { // TODO add that detail to docs
+ throw new IOException("Column " + chunk.getPath().toDotString() + "
is encrypted");
+ }
+ ColumnDescriptor descriptorL = descriptorsMap.get(chunk.getPath());
+ if (descriptorL != null) { // descriptorL might be NULL if a column is
from the right side of a join
+ reader.setStreamPosition(chunk.getStartingPos());
+ BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
+ ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
+ OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
+ writer.appendColumnChunk(descriptorL, reader.getStream(), chunk,
bloomFilter, columnIndex, offsetIndex);
+ }
+ }
+
+ // Writing the right side
+ for (RightColumnWriter writer: columnWritersR) {
Review Comment:
Is it possible to move this into `processBlocksFromReader`?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +160,31 @@ public ParquetRewriter(RewriteOptions options) throws
IOException {
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
+ LOG.warn("Input column name {} doesn't show up in the schema", col);
}
}
Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
schema = pruneColumnsInSchema(schema, prunePaths);
}
- this.descriptorsMap =
- schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ if (inputFilesR.isEmpty()) { // TODO: find a more suitable solution
+ this.descriptorsMap =
+ schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ } else {
+ this.descriptorsMap = schemaL.getColumns().stream()
+ .filter(x -> x.getPath().length == 0 ||
!fieldNamesR.containsKey(x.getPath()[0]))
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ }
+
+ long rowCountL =
inputFiles.stream().mapToLong(ParquetFileReader::getRecordCount).sum();
+ inputFilesR.stream()
+ .map(x ->
x.stream().mapToLong(ParquetFileReader::getRecordCount).sum())
+ .forEach(rowCountR -> {
+ if (rowCountL != rowCountR) {
+ throw new IllegalArgumentException("The number of records on the
left and on the right don't match!");
Review Comment:
nit: print the row count in the error msg.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -110,6 +118,27 @@ public List<Path> getInputFiles() {
.collect(Collectors.toList());
}
+ /** TODO fix documentation after addition of inputFilesR
+ * Gets the right input {@link Path}s for the rewrite if they exist for all
input files,
+ * otherwise throws a {@link RuntimeException}.
+ *
+ * @return a {@link List} of the associated right input {@link Path}s
+ */
+ public List<List<Path>> getInputFilesR() {
Review Comment:
The getInputFiles() method above is not used anywhere. Probably we need to
remove that as well. @amousavigourabi
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]