wgtmac commented on code in PR #1335:
URL: https://github.com/apache/parquet-java/pull/1335#discussion_r1706401936
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -358,6 +466,9 @@ public Builder indexCacheStrategy(IndexCache.CacheStrategy
cacheStrategy) {
public RewriteOptions build() {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"Input file is required");
Preconditions.checkArgument(outputFile != null, "Output file is
required");
+ Preconditions.checkArgument(
+ inputFilesToJoin == null || !inputFiles.isEmpty(),
Review Comment:
```suggestion
inputFilesToJoin == null || !inputFilesToJoin.isEmpty(),
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
- // Schema of input files (should be the same) and to write to the output file
- private MessageType schema = null;
- private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
- // The reader for the current input file
- private TransParquetFileReader reader = null;
- // The metadata of current reader being processed
- private ParquetMetadata meta = null;
- // created_by information of current reader being processed
- private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
+ private final Queue<TransParquetFileReader> inputFilesToJoin = new
LinkedList<>();
+ private MessageType outSchema;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean overwriteInputWithJoinColumns;
public ParquetRewriter(RewriteOptions options) throws IOException {
+ this.newCodecName = options.getNewCodecName();
+ this.indexCacheStrategy = options.getIndexCacheStrategy();
+ this.overwriteInputWithJoinColumns =
options.getOverwriteInputWithJoinColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
- LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
-
- // Init reader of the first input file
- initNextReader();
-
- newCodecName = options.getNewCodecName();
- List<String> pruneColumns = options.getPruneColumns();
- // Prune columns if specified
- if (pruneColumns != null && !pruneColumns.isEmpty()) {
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
- for (String col : pruneColumns) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
- }
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ ensureSameSchema(inputFiles);
+ ensureSameSchema(inputFilesToJoin);
+ LOG.info(
+ "Start rewriting {} input file(s) {} to {}",
+ inputFiles.size() + inputFilesToJoin.size(),
+ Stream.concat(options.getParquetInputFiles().stream(),
options.getParquetInputFilesToJoin().stream())
+ .collect(Collectors.toList()),
+ out);
+
+ this.outSchema = getSchema();
+ this.outSchema = pruneColumnsInSchema(outSchema,
options.getPruneColumns());
+
+ List<TransParquetFileReader> allFiles;
Review Comment:
What about adding a separate `mergeExtraMetaData()` method?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
}
public void processBlocks() throws IOException {
- while (reader != null) {
- IndexCache indexCache = IndexCache.create(reader,
descriptorsMap.keySet(), indexCacheStrategy, true);
- processBlocksFromReader(indexCache);
- indexCache.clean();
- initNextReader();
- }
- }
-
- private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
- for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
- BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
- writer.startBlock(blockMetaData.getRowCount());
- indexCache.setBlockMetadata(blockMetaData);
- List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
- for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
- // This column has been pruned.
- if (descriptor == null) {
- continue;
- }
-
- // If a column is encrypted, we simply throw exception.
- // Later we can add a feature to trans-encrypt it with different keys
- if (chunk.isEncrypted()) {
- throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ TransParquetFileReader readerJoin = inputFilesToJoin.peek();
Review Comment:
Same for below variables.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
}
public void processBlocks() throws IOException {
- while (reader != null) {
- IndexCache indexCache = IndexCache.create(reader,
descriptorsMap.keySet(), indexCacheStrategy, true);
- processBlocksFromReader(indexCache);
- indexCache.clean();
- initNextReader();
- }
- }
-
- private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
- for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
- BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
- writer.startBlock(blockMetaData.getRowCount());
- indexCache.setBlockMetadata(blockMetaData);
- List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
- for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
- // This column has been pruned.
- if (descriptor == null) {
- continue;
- }
-
- // If a column is encrypted, we simply throw exception.
- // Later we can add a feature to trans-encrypt it with different keys
- if (chunk.isEncrypted()) {
- throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ TransParquetFileReader readerJoin = inputFilesToJoin.peek();
+ IndexCache indexCacheJoin = null;
+ int blockIdxJoin = -1;
+ List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+ while (!inputFiles.isEmpty()) {
+ TransParquetFileReader reader = inputFiles.poll();
+ LOG.info("Rewriting input file: {}, remaining files: {}",
reader.getFile(), inputFiles.size());
+ ParquetMetadata meta = reader.getFooter();
+ Set<ColumnPath> columnPaths =
meta.getFileMetaData().getSchema().getColumns().stream()
+ .map(x -> ColumnPath.get(x.getPath()))
+ .collect(Collectors.toSet());
+ IndexCache indexCache = IndexCache.create(reader, columnPaths,
indexCacheStrategy, true);
+
+ for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+ BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+ writer.startBlock(blockMetaData.getRowCount());
+ indexCache.setBlockMetadata(blockMetaData);
+ Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+ blockMetaData.getColumns().stream().collect(Collectors.toMap(x ->
x.getPath(), x -> x));
+
+ if (readerJoin != null
+ && (blockIdxJoin == -1
+ || ++blockIdxJoin
+ == readerJoin.getFooter().getBlocks().size())) {
+ blockIdxJoin = 0;
+ readerJoin = inputFilesToJoin.poll();
+ Set<ColumnPath> columnPathsJoin =
readerJoin.getFileMetaData().getSchema().getColumns().stream()
+ .map(x -> ColumnPath.get(x.getPath()))
+ .collect(Collectors.toSet());
+ if (indexCacheJoin != null) {
+ indexCacheJoin.clean();
+ }
+ indexCacheJoin = IndexCache.create(readerJoin, columnPathsJoin,
indexCacheStrategy, true);
+ indexCacheJoin.setBlockMetadata(
+ readerJoin.getFooter().getBlocks().get(blockIdxJoin));
+ } else {
+ blockIdxJoin++;
}
- reader.setStreamPosition(chunk.getStartingPos());
- CompressionCodecName newCodecName = this.newCodecName == null ?
chunk.getCodec() : this.newCodecName;
- boolean encryptColumn =
- encryptMode && encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
-
- if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
- // Mask column and compress it again.
- MaskMode maskMode = maskColumns.get(chunk.getPath());
- if (maskMode.equals(MaskMode.NULLIFY)) {
- Type.Repetition repetition =
- descriptor.getPrimitiveType().getRepetition();
- if (repetition.equals(Type.Repetition.REQUIRED)) {
- throw new IOException("Required column ["
- + descriptor.getPrimitiveType().getName() + "] cannot be
nullified");
+ for (int outColumnIdx = 0; outColumnIdx < outColumns.size();
outColumnIdx++) {
+ ColumnPath colPath =
+ ColumnPath.get(outColumns.get(outColumnIdx).getPath());
+ if (readerJoin != null) {
+ Optional<ColumnChunkMetaData> chunkJoin =
Review Comment:
```suggestion
Optional<ColumnChunkMetaData> chunkToJoin =
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -39,38 +39,55 @@
/**
* A set of options to create a ParquetRewriter.
+ *
+ * TODO find a place where to put a proper description of functionality as it
is not trivial:
Review Comment:
It seems that we are missing a README.md on the rewriter usage.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
- // Schema of input files (should be the same) and to write to the output file
- private MessageType schema = null;
- private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
- // The reader for the current input file
- private TransParquetFileReader reader = null;
- // The metadata of current reader being processed
- private ParquetMetadata meta = null;
- // created_by information of current reader being processed
- private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
+ private final Queue<TransParquetFileReader> inputFilesToJoin = new
LinkedList<>();
+ private MessageType outSchema;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean overwriteInputWithJoinColumns;
public ParquetRewriter(RewriteOptions options) throws IOException {
+ this.newCodecName = options.getNewCodecName();
+ this.indexCacheStrategy = options.getIndexCacheStrategy();
+ this.overwriteInputWithJoinColumns =
options.getOverwriteInputWithJoinColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
- LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
-
- // Init reader of the first input file
- initNextReader();
-
- newCodecName = options.getNewCodecName();
- List<String> pruneColumns = options.getPruneColumns();
- // Prune columns if specified
- if (pruneColumns != null && !pruneColumns.isEmpty()) {
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
- for (String col : pruneColumns) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
- }
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ ensureSameSchema(inputFiles);
+ ensureSameSchema(inputFilesToJoin);
+ LOG.info(
+ "Start rewriting {} input file(s) {} to {}",
+ inputFiles.size() + inputFilesToJoin.size(),
+ Stream.concat(options.getParquetInputFiles().stream(),
options.getParquetInputFilesToJoin().stream())
+ .collect(Collectors.toList()),
+ out);
+
+ this.outSchema = getSchema();
+ this.outSchema = pruneColumnsInSchema(outSchema,
options.getPruneColumns());
+
+ List<TransParquetFileReader> allFiles;
+ if (options.getIgnoreJoinFilesMetadata()) {
+ allFiles = new ArrayList<>(inputFiles);
+ } else {
+ allFiles = Stream.concat(inputFiles.stream(), inputFilesToJoin.stream())
+ .collect(Collectors.toList());
+ }
+ extraMetaData.put(
+ ORIGINAL_CREATED_BY_KEY,
+ allFiles.stream()
+ .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+ .collect(Collectors.toSet())
+ .stream()
+ .reduce((a, b) -> a + "\n" + b)
+ .orElse(""));
+ allFiles.forEach(x ->
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+ if (!inputFilesToJoin.isEmpty()) {
Review Comment:
What about adding a `checkRowCountToJoin()` method?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
}
public void processBlocks() throws IOException {
- while (reader != null) {
- IndexCache indexCache = IndexCache.create(reader,
descriptorsMap.keySet(), indexCacheStrategy, true);
- processBlocksFromReader(indexCache);
- indexCache.clean();
- initNextReader();
- }
- }
-
- private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
- for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
- BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
- writer.startBlock(blockMetaData.getRowCount());
- indexCache.setBlockMetadata(blockMetaData);
- List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
- for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
- // This column has been pruned.
- if (descriptor == null) {
- continue;
- }
-
- // If a column is encrypted, we simply throw exception.
- // Later we can add a feature to trans-encrypt it with different keys
- if (chunk.isEncrypted()) {
- throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ TransParquetFileReader readerJoin = inputFilesToJoin.peek();
Review Comment:
```suggestion
TransParquetFileReader readerToJoin = inputFilesToJoin.peek();
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -176,22 +192,40 @@ public ParquetRewriter(RewriteOptions options) throws
IOException {
writer.start();
}
+ private MessageType getSchema() {
+ MessageType schemaMain =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ if (inputFilesToJoin.isEmpty()) {
+ return schemaMain;
+ } else {
+ Map<String, Type> fieldNames = new LinkedHashMap<>();
+ schemaMain.getFields().forEach(x -> fieldNames.put(x.getName(), x));
+ inputFilesToJoin
+ .peek()
+ .getFooter()
+ .getFileMetaData()
+ .getSchema()
+ .getFields()
+ .forEach(x -> {
+ if (!fieldNames.containsKey(x.getName()) ||
overwriteInputWithJoinColumns) {
+ fieldNames.put(x.getName(), x);
Review Comment:
Please add a log to indicate `overwriteInputWithJoinColumns` takes effect
for any column
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -325,6 +388,18 @@ public Builder addInputFile(Path path) {
return this;
}
+ /** TODO fix documentation after addition of InputFilesToJoin
Review Comment:
Should we remove the TODO now?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
}
public void processBlocks() throws IOException {
- while (reader != null) {
- IndexCache indexCache = IndexCache.create(reader,
descriptorsMap.keySet(), indexCacheStrategy, true);
- processBlocksFromReader(indexCache);
- indexCache.clean();
- initNextReader();
- }
- }
-
- private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
- for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
- BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
- writer.startBlock(blockMetaData.getRowCount());
- indexCache.setBlockMetadata(blockMetaData);
- List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
- for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
- // This column has been pruned.
- if (descriptor == null) {
- continue;
- }
-
- // If a column is encrypted, we simply throw exception.
- // Later we can add a feature to trans-encrypt it with different keys
- if (chunk.isEncrypted()) {
- throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ TransParquetFileReader readerJoin = inputFilesToJoin.peek();
+ IndexCache indexCacheJoin = null;
+ int blockIdxJoin = -1;
+ List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+ while (!inputFiles.isEmpty()) {
+ TransParquetFileReader reader = inputFiles.poll();
+ LOG.info("Rewriting input file: {}, remaining files: {}",
reader.getFile(), inputFiles.size());
+ ParquetMetadata meta = reader.getFooter();
+ Set<ColumnPath> columnPaths =
meta.getFileMetaData().getSchema().getColumns().stream()
+ .map(x -> ColumnPath.get(x.getPath()))
+ .collect(Collectors.toSet());
+ IndexCache indexCache = IndexCache.create(reader, columnPaths,
indexCacheStrategy, true);
+
+ for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+ BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+ writer.startBlock(blockMetaData.getRowCount());
+ indexCache.setBlockMetadata(blockMetaData);
+ Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+ blockMetaData.getColumns().stream().collect(Collectors.toMap(x ->
x.getPath(), x -> x));
+
+ if (readerJoin != null
+ && (blockIdxJoin == -1
+ || ++blockIdxJoin
+ == readerJoin.getFooter().getBlocks().size())) {
+ blockIdxJoin = 0;
+ readerJoin = inputFilesToJoin.poll();
Review Comment:
It seems that close() is not called for the reader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]