MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1564278524
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +121,55 @@ public class ParquetRewriter implements Closeable {
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+ List<Queue<TransParquetFileReader>> inputFilesR =
options.getParquetInputFilesR().stream()
+ .map(x -> getFileReaders(x, conf))
+ .collect(Collectors.toList());
+ ensureSameSchema(inputFiles);
+ inputFilesR.forEach(this::ensureSameSchema);
LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
- // Init reader of the first input file
- initNextReader();
+ extraMetaData.put(
+ ORIGINAL_CREATED_BY_KEY,
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+ .collect(Collectors.toSet())
+ .stream()
+ .reduce((a, b) -> a + "\n" + b)
+ .orElse(""));
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .forEach(x ->
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+ MessageType schemaL =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ List<MessageType> schemaR = inputFilesR.stream()
+ .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+ .collect(Collectors.toList());
+ Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+ schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+ Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+ schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+ if (fieldNamesR.containsKey(x.getName())) {
+ throw new IllegalArgumentException(
+ "Found a duplicated field `" + x.getName() + "` in the right side
file groups!");
+ }
+ fieldNamesR.put(x.getName(), x);
+ });
+ List<Type> fields = Stream.concat(
+ fieldNamesL.values().stream()
+ .map(x -> fieldNamesR.getOrDefault(
+ x.getName(), x)), // take a field on the right if we can
+ fieldNamesR.values().stream()
+ .filter(x -> !fieldNamesL.containsKey(
+ x.getName())) // takes fields on the right if it was not
present on the left
+ )
+ .collect(Collectors.toList());
+ schema = new MessageType(schemaL.getName(), fields);
Review Comment:
@wgtmac you also mentioned that same problem [in this
comment](https://github.com/apache/parquet-mr/pull/1273/files#r1547172865).
I've just spend some time on trying to refactor this part. I tried to align
schema column order and actual write order of columns and it is mostly doable
but RightColumnWriter state becomes too complex, for example file queue need to
be present for each individual column on the right. I think the code will
become to difficult to support long term if we do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]