MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1560393432
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +117,55 @@ public class ParquetRewriter implements Closeable {
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+ List<Queue<TransParquetFileReader>> inputFilesR =
options.getParquetInputFilesR().stream()
+ .map(x -> getFileReaders(x, conf))
+ .collect(Collectors.toList());
+ ensureSameSchema(inputFiles);
+ inputFilesR.forEach(this::ensureSameSchema);
LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
- // Init reader of the first input file
- initNextReader();
+ extraMetaData.put(
+ ORIGINAL_CREATED_BY_KEY,
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+ .collect(Collectors.toSet())
+ .stream()
+ .reduce((a, b) -> a + "\n" + b)
+ .orElse(""));
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .forEach(x ->
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+ MessageType schemaL =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ List<MessageType> schemaR = inputFilesR.stream()
+ .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+ .collect(Collectors.toList());
+ Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+ schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+ Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+ schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+ if (fieldNamesR.containsKey(x.getName())) {
+ throw new IllegalArgumentException(
+ "Found a duplicated field `" + x.getName() + "` in the right side
file groups!");
+ }
+ fieldNamesR.put(x.getName(), x);
+ });
+ List<Type> fields = Stream.concat(
+ fieldNamesL.values().stream()
+ .map(x -> fieldNamesR.getOrDefault(
Review Comment:
> resolve strategy for duplicate
Can you please clarify what do you mean by that? You mean if the schema of
the column on the left is "compatible" with a new column schema on the right
side then we try to "merge" the columns?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]