MaxNevermind commented on code in PR #1335:
URL: https://github.com/apache/parquet-java/pull/1335#discussion_r1704882668
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
- // Schema of input files (should be the same) and to write to the output file
- private MessageType schema = null;
- private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
- // The reader for the current input file
- private TransParquetFileReader reader = null;
- // The metadata of current reader being processed
- private ParquetMetadata meta = null;
- // created_by information of current reader being processed
- private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
+ private final Queue<TransParquetFileReader> inputFilesToJoin = new
LinkedList<>();
+ private MessageType outSchema;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean overwriteInputWithJoinColumns;
public ParquetRewriter(RewriteOptions options) throws IOException {
+ this.newCodecName = options.getNewCodecName();
+ this.indexCacheStrategy = options.getIndexCacheStrategy();
+ this.overwriteInputWithJoinColumns =
options.getOverwriteInputWithJoinColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
- LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
-
- // Init reader of the first input file
- initNextReader();
-
- newCodecName = options.getNewCodecName();
- List<String> pruneColumns = options.getPruneColumns();
- // Prune columns if specified
- if (pruneColumns != null && !pruneColumns.isEmpty()) {
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
- for (String col : pruneColumns) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
- }
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ ensureSameSchema(inputFiles);
+ ensureSameSchema(inputFilesToJoin);
Review Comment:
> The inputFilesToJoin are the right-side files, right?
Yes
> Why do they need to have the same schema?
If they don't have the same schema, then how do we figure out which columns
need to be added? I guess we can have a list of columns from the right provided
as a parameter form a user or create a union of all present columns but then we
again getting into complicated implementation imo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]