ConeyLiu commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1558951400
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +955,175 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxL = 0; // index of the rowGroup of the current file
on the left
+ private int rowGroupIdxR = 0; // index of the rowGroup of the current file
on the right
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxL > rowGroupIdx) {
+ throw new IOException("A row group index decrease is determined in
RightColumnWriter! Current index: "
+ + rowGroupIdxL + ", new index: " + rowGroupIdx);
+ }
+ if (rowGroupIdxL != rowGroupIdx) {
+ rowGroupIdxL = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxR);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
Review Comment:
Why not check this in `initWriters`? We should fail fast.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]