[
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745963#comment-17745963
]
ASF GitHub Bot commented on PARQUET-1381:
-----------------------------------------
wgtmac commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1271309683
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java:
##########
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+ private final MessageType schema;
+ private final CompressionCodecFactory.BytesInputCompressor compressor;
+ private final ParquetProperties parquetProperties;
+
+ public RowGroupMerger(MessageType schema, CompressionCodecName compression,
boolean useV2ValueWriter) {
+ this(schema, new Configuration(), compression, useV2ValueWriter);
+ }
+
+ RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName
compression, boolean useV2ValueWriter) {
+ this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+ }
+
+ RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName
compression, ParquetProperties parquetProperties) {
+ this.schema = schema;
+ this.parquetProperties = parquetProperties;
+ this.compressor = new CodecFactory(conf,
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+ }
+
+ /**
+ * Merges the row groups making sure that new row groups do not exceed the
supplied maxRowGroupSize
+ *
+ * @param inputFiles input files to merge
+ * @param maxRowGroupSize the max limit for new blocks
+ * @param writer writer to write the new blocks to
+ * @throws IOException if an IO error occurs
+ */
+ public void merge(List<ParquetFileReader> inputFiles, final long
maxRowGroupSize,
+ ParquetFileWriter writer) throws IOException {
+
+ SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() !=
CompressionCodecName.UNCOMPRESSED);
+ MutableMergedBlock mergedBlock = null;
+ for (ParquetFileReader reader : inputFiles) {
+ for (BlockMetaData blockMeta : reader.getRowGroups()) {
+ PageReadStore group = reader.readNextRowGroup();
+ Preconditions.checkState(group != null,
+ "number of groups returned by FileReader does not match metadata");
+
+ if (mergedBlock != null && mergedBlock.getCompressedSize() +
estimator.estimate(blockMeta) > maxRowGroupSize) {
+ saveBlockTo(mergedBlock, writer);
+ mergedBlock = null;
+ }
+
+ if (mergedBlock == null && estimator.estimate(blockMeta) >
maxRowGroupSize) {
+ //save it directly without re encoding it
+ saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema,
compressor), writer);
+ continue;
+ }
+
+ if (mergedBlock == null) {
+ mergedBlock = new MutableMergedBlock(schema);
+ }
+
+ long sizeBeforeMerge = mergedBlock.getCompressedSize();
+ mergedBlock.merge(blockMeta, group);
+ //update our estimator
+ long currentBlockEffect = mergedBlock.getCompressedSize() -
sizeBeforeMerge;
+ estimator.update(currentBlockEffect, blockMeta);
+ }
+ }
+ if (mergedBlock != null) {
+ saveBlockTo(mergedBlock, writer);
+ }
+ mergedBlock = null;
+ }
+
+
+ private void saveBlockTo(MergedBlock block, ParquetFileWriter writer) {
+ try {
+ writer.startBlock(block.rowCount());
+
+ for (MergedColumn col : block.columnsInOrder()) {
+ writer.startColumn(col.getColumnDesc(), col.getValueCount(),
col.getCompression());
+
+ col.writeDictionaryPageTo(writer);
+ col.writeDataPagesTo(writer);
+
+ writer.endColumn();
+ }
+
+ writer.endBlock();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static ParquetProperties createParquetProperties(boolean
useV2Writer) {
+ ParquetProperties.Builder builder = ParquetProperties.builder();
+ if (useV2Writer) {
+ builder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+ return builder.build();
+ }
+
+ private BytesInput compress(BytesInput bytes) {
+ return compress(bytes, compressor);
+ }
+
+ private static BytesInput compress(BytesInput bytes,
CompressionCodecFactory.BytesInputCompressor compressor) {
+ try {
+ //we copy as some compressors use shared memory
+ return BytesInput.copy(compressor.compress(bytes));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private BiConsumer<ValuesReader, ValuesWriter>
createWritingBridge(PrimitiveType.PrimitiveTypeName typeName) {
+ switch (typeName) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ case BINARY:
+ return (src, dest) -> dest.writeBytes(src.readBytes());
+ case BOOLEAN:
+ return (src, dest) -> dest.writeBoolean(src.readBoolean());
+ case DOUBLE:
+ return (src, dest) -> dest.writeDouble(src.readDouble());
+ case FLOAT:
+ return (src, dest) -> dest.writeFloat(src.readFloat());
+ case INT32:
+ return (src, dest) -> dest.writeInteger(src.readInteger());
+ case INT64:
+ return (src, dest) -> dest.writeLong(src.readLong());
+ default:
+ throw new RuntimeException("Unsupported column primitive type: " +
typeName.name());
+ }
+ }
+
+ private static void writePageTo(DataPage dataPage, ParquetFileWriter writer)
{
+ dataPage.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 page) {
+ try {
+ if (page.getIndexRowCount().isPresent()) {
+ writer.writeDataPage(page.getValueCount(),
page.getUncompressedSize(),
+ page.getBytes(), page.getStatistics(),
page.getIndexRowCount().get(), page.getRlEncoding(),
+ page.getDlEncoding(), page.getValueEncoding());
+
+ } else {
+ writer.writeDataPage(page.getValueCount(),
page.getUncompressedSize(),
+ page.getBytes(), page.getStatistics(), page.getRlEncoding(),
+ page.getDlEncoding(), page.getValueEncoding());
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 page) {
+ try {
+ writer.writeDataPageV2(page.getRowCount(), page.getNullCount(),
page.getValueCount(),
+ page.getRepetitionLevels(), page.getDefinitionLevels(),
page.getDataEncoding(),
+ page.getData(), page.getUncompressedSize(), page.getStatistics());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return null;
+ }
+ });
+ }
+
+ private static DictionaryPage getCompressedDictionary(DictionaryPage
dictionary, CompressionCodecFactory.BytesInputCompressor compressor) {
+ return new DictionaryPage(
+ compress(dictionary.getBytes(), compressor),
+ dictionary.getUncompressedSize(),
+ dictionary.getDictionarySize(),
+ dictionary.getEncoding());
+ }
+
+ private interface MergedBlock {
+ long rowCount();
+
+ List<MergedColumn> columnsInOrder();
+ }
+
+ private interface MergedColumn {
+ long getValueCount();
+
+ CompressionCodecName getCompression();
+
+ ColumnDescriptor getColumnDesc();
+
+ void writeDataPagesTo(ParquetFileWriter writer);
+
+ void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException;
+ }
+
+ private class MutableMergedBlock implements MergedBlock {
+
+ private final Map<ColumnDescriptor, MutableMergedColumn> columns = new
HashMap<>();
+ private final MessageType schema;
+ private long recordCount;
+ private long compressedSize;
+
+ private MutableMergedBlock(MessageType schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public long rowCount() {
+ return recordCount;
+ }
+
+ private long getCompressedSize() {
+ return compressedSize;
+ }
+
+ @Override
+ public List<MergedColumn> columnsInOrder() {
+ return schema.getColumns()
+ .stream()
+ .map(columns::get)
+ .collect(Collectors.toList());
+ }
+
+ MutableMergedColumn getOrCreateColumn(ColumnDescriptor column) {
+ return columns.computeIfAbsent(column, desc -> new
MutableMergedColumn(desc, this::addCompressedBytes));
+ }
+
+ void addRowCount(long rowCount) {
+ recordCount += rowCount;
+ }
+
+ void addCompressedBytes(long size) {
+ compressedSize += size;
+ }
+
+ void merge(BlockMetaData blockMeta, PageReadStore group) throws
IOException {
+ for (Entry<ColumnDescriptor, ColumnChunkMetaData> col :
getColumnsInOrder(blockMeta, schema)) {
+
+ MutableMergedColumn column = getOrCreateColumn(col.getKey());
+ PageReader columnReader = group.getPageReader(col.getKey());
+
+ DictionaryPage dictPage = columnReader.readDictionaryPage();
+ Dictionary decodedDictionary = null;
+ if (dictPage != null) {
+ decodedDictionary =
dictPage.getEncoding().initDictionary(column.getColumnDesc(), dictPage);
+ }
+
+ //read all pages in this column chunk
+ DataPage data;
+ while ((data = columnReader.readPage()) != null) {
+ column.addPage(data, decodedDictionary);
+ }
+ }
+ addRowCount(blockMeta.getRowCount());
+ }
+ }
+
+ private static List<Entry<ColumnDescriptor, ColumnChunkMetaData>>
getColumnsInOrder(BlockMetaData blockMeta,
+
MessageType schema) {
+
+ return ParquetFileWriter.getColumnsInOrder(blockMeta, schema,
false).stream()
+ .map(c -> toEntry(schema, c))
+ .collect(Collectors.toList());
+ }
+
+ private static SimpleEntry<ColumnDescriptor, ColumnChunkMetaData>
toEntry(MessageType schema,
+
ColumnChunkMetaData column) {
+
+ return new SimpleEntry<>(
+ schema.getColumnDescription(column.getPath().toArray()),
+ column);
+ }
+
+ private static class ReadOnlyMergedBlock implements MergedBlock {
+ private final List<MergedColumn> columns;
+ private final long recordCount;
+
+ private ReadOnlyMergedBlock(List<MergedColumn> columns, long recordCount) {
+ this.columns = Collections.unmodifiableList(columns);
+ this.recordCount = recordCount;
+ }
+
+ @Override
+ public long rowCount() {
+ return recordCount;
+ }
+
+ @Override
+ public List<MergedColumn> columnsInOrder() {
+ return columns;
+ }
+
+ static ReadOnlyMergedBlock of(BlockMetaData blockMeta, PageReadStore
group, MessageType schema,
+ CompressionCodecFactory.BytesInputCompressor
compressor) {
+ List<MergedColumn> columns = new ArrayList<>();
+ for (Entry<ColumnDescriptor, ColumnChunkMetaData> col :
getColumnsInOrder(blockMeta, schema)) {
+
+ List<DataPage> pages = new ArrayList<>();
+ PageReader columnReader = group.getPageReader(col.getKey());
+
+ DictionaryPage dictPage = columnReader.readDictionaryPage();
+ if (dictPage != null) {
+ dictPage = getCompressedDictionary(dictPage, compressor);
+ }
+
+ //read all pages in this column chunk
+ DataPage data;
+ while ((data = columnReader.readPage()) != null) {
+
+ data = data.accept(new DataPage.Visitor<DataPage>() {
+ @Override
+ public DataPage visit(DataPageV1 pageV1) {
+
+ return new DataPageV1(compress(pageV1.getBytes(), compressor),
pageV1.getValueCount(),
Review Comment:
Why does DataPageV1 require to compress again here but DataPageV2 does not
(line 384 below)?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java:
##########
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+ private final MessageType schema;
+ private final CompressionCodecFactory.BytesInputCompressor compressor;
+ private final ParquetProperties parquetProperties;
+
+ public RowGroupMerger(MessageType schema, CompressionCodecName compression,
boolean useV2ValueWriter) {
+ this(schema, new Configuration(), compression, useV2ValueWriter);
+ }
+
+ RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName
compression, boolean useV2ValueWriter) {
+ this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+ }
+
+ RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName
compression, ParquetProperties parquetProperties) {
+ this.schema = schema;
+ this.parquetProperties = parquetProperties;
+ this.compressor = new CodecFactory(conf,
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+ }
+
+ /**
+ * Merges the row groups making sure that new row groups do not exceed the
supplied maxRowGroupSize
+ *
+ * @param inputFiles input files to merge
+ * @param maxRowGroupSize the max limit for new blocks
+ * @param writer writer to write the new blocks to
+ * @throws IOException if an IO error occurs
+ */
+ public void merge(List<ParquetFileReader> inputFiles, final long
maxRowGroupSize,
+ ParquetFileWriter writer) throws IOException {
+
+ SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() !=
CompressionCodecName.UNCOMPRESSED);
+ MutableMergedBlock mergedBlock = null;
+ for (ParquetFileReader reader : inputFiles) {
+ for (BlockMetaData blockMeta : reader.getRowGroups()) {
+ PageReadStore group = reader.readNextRowGroup();
+ Preconditions.checkState(group != null,
+ "number of groups returned by FileReader does not match metadata");
+
+ if (mergedBlock != null && mergedBlock.getCompressedSize() +
estimator.estimate(blockMeta) > maxRowGroupSize) {
+ saveBlockTo(mergedBlock, writer);
+ mergedBlock = null;
+ }
+
+ if (mergedBlock == null && estimator.estimate(blockMeta) >
maxRowGroupSize) {
+ //save it directly without re encoding it
+ saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema,
compressor), writer);
Review Comment:
I agree, so it might be good to integrate this with `ParquetRewriter` if one
row group does not need to be merged.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java:
##########
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+ private final MessageType schema;
+ private final CompressionCodecFactory.BytesInputCompressor compressor;
+ private final ParquetProperties parquetProperties;
+
+ public RowGroupMerger(MessageType schema, CompressionCodecName compression,
boolean useV2ValueWriter) {
+ this(schema, new Configuration(), compression, useV2ValueWriter);
+ }
+
+ RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName
compression, boolean useV2ValueWriter) {
+ this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+ }
+
+ RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName
compression, ParquetProperties parquetProperties) {
+ this.schema = schema;
+ this.parquetProperties = parquetProperties;
+ this.compressor = new CodecFactory(conf,
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+ }
+
+ /**
+ * Merges the row groups making sure that new row groups do not exceed the
supplied maxRowGroupSize
+ *
+ * @param inputFiles input files to merge
+ * @param maxRowGroupSize the max limit for new blocks
+ * @param writer writer to write the new blocks to
+ * @throws IOException if an IO error occurs
+ */
+ public void merge(List<ParquetFileReader> inputFiles, final long
maxRowGroupSize,
+ ParquetFileWriter writer) throws IOException {
+
+ SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() !=
CompressionCodecName.UNCOMPRESSED);
+ MutableMergedBlock mergedBlock = null;
+ for (ParquetFileReader reader : inputFiles) {
+ for (BlockMetaData blockMeta : reader.getRowGroups()) {
+ PageReadStore group = reader.readNextRowGroup();
+ Preconditions.checkState(group != null,
+ "number of groups returned by FileReader does not match metadata");
+
+ if (mergedBlock != null && mergedBlock.getCompressedSize() +
estimator.estimate(blockMeta) > maxRowGroupSize) {
+ saveBlockTo(mergedBlock, writer);
+ mergedBlock = null;
+ }
+
+ if (mergedBlock == null && estimator.estimate(blockMeta) >
maxRowGroupSize) {
+ //save it directly without re encoding it
+ saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema,
compressor), writer);
+ continue;
+ }
+
+ if (mergedBlock == null) {
+ mergedBlock = new MutableMergedBlock(schema);
+ }
+
+ long sizeBeforeMerge = mergedBlock.getCompressedSize();
+ mergedBlock.merge(blockMeta, group);
+ //update our estimator
+ long currentBlockEffect = mergedBlock.getCompressedSize() -
sizeBeforeMerge;
+ estimator.update(currentBlockEffect, blockMeta);
+ }
+ }
+ if (mergedBlock != null) {
+ saveBlockTo(mergedBlock, writer);
+ }
+ mergedBlock = null;
+ }
+
+
+ private void saveBlockTo(MergedBlock block, ParquetFileWriter writer) {
+ try {
+ writer.startBlock(block.rowCount());
+
+ for (MergedColumn col : block.columnsInOrder()) {
+ writer.startColumn(col.getColumnDesc(), col.getValueCount(),
col.getCompression());
+
+ col.writeDictionaryPageTo(writer);
+ col.writeDataPagesTo(writer);
+
+ writer.endColumn();
+ }
+
+ writer.endBlock();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static ParquetProperties createParquetProperties(boolean
useV2Writer) {
+ ParquetProperties.Builder builder = ParquetProperties.builder();
+ if (useV2Writer) {
+ builder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+ return builder.build();
+ }
+
+ private BytesInput compress(BytesInput bytes) {
+ return compress(bytes, compressor);
+ }
+
+ private static BytesInput compress(BytesInput bytes,
CompressionCodecFactory.BytesInputCompressor compressor) {
+ try {
+ //we copy as some compressors use shared memory
+ return BytesInput.copy(compressor.compress(bytes));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private BiConsumer<ValuesReader, ValuesWriter>
createWritingBridge(PrimitiveType.PrimitiveTypeName typeName) {
+ switch (typeName) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ case BINARY:
+ return (src, dest) -> dest.writeBytes(src.readBytes());
+ case BOOLEAN:
+ return (src, dest) -> dest.writeBoolean(src.readBoolean());
+ case DOUBLE:
+ return (src, dest) -> dest.writeDouble(src.readDouble());
+ case FLOAT:
+ return (src, dest) -> dest.writeFloat(src.readFloat());
+ case INT32:
+ return (src, dest) -> dest.writeInteger(src.readInteger());
+ case INT64:
+ return (src, dest) -> dest.writeLong(src.readLong());
+ default:
+ throw new RuntimeException("Unsupported column primitive type: " +
typeName.name());
+ }
+ }
+
+ private static void writePageTo(DataPage dataPage, ParquetFileWriter writer)
{
+ dataPage.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 page) {
+ try {
+ if (page.getIndexRowCount().isPresent()) {
+ writer.writeDataPage(page.getValueCount(),
page.getUncompressedSize(),
+ page.getBytes(), page.getStatistics(),
page.getIndexRowCount().get(), page.getRlEncoding(),
+ page.getDlEncoding(), page.getValueEncoding());
+
+ } else {
+ writer.writeDataPage(page.getValueCount(),
page.getUncompressedSize(),
+ page.getBytes(), page.getStatistics(), page.getRlEncoding(),
+ page.getDlEncoding(), page.getValueEncoding());
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 page) {
+ try {
+ writer.writeDataPageV2(page.getRowCount(), page.getNullCount(),
page.getValueCount(),
+ page.getRepetitionLevels(), page.getDefinitionLevels(),
page.getDataEncoding(),
+ page.getData(), page.getUncompressedSize(), page.getStatistics());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return null;
+ }
+ });
+ }
+
+ private static DictionaryPage getCompressedDictionary(DictionaryPage
dictionary, CompressionCodecFactory.BytesInputCompressor compressor) {
+ return new DictionaryPage(
+ compress(dictionary.getBytes(), compressor),
+ dictionary.getUncompressedSize(),
+ dictionary.getDictionarySize(),
+ dictionary.getEncoding());
+ }
+
+ private interface MergedBlock {
+ long rowCount();
+
+ List<MergedColumn> columnsInOrder();
+ }
+
+ private interface MergedColumn {
+ long getValueCount();
+
+ CompressionCodecName getCompression();
+
+ ColumnDescriptor getColumnDesc();
+
+ void writeDataPagesTo(ParquetFileWriter writer);
+
+ void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException;
+ }
+
+ private class MutableMergedBlock implements MergedBlock {
+
+ private final Map<ColumnDescriptor, MutableMergedColumn> columns = new
HashMap<>();
+ private final MessageType schema;
+ private long recordCount;
+ private long compressedSize;
+
+ private MutableMergedBlock(MessageType schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public long rowCount() {
+ return recordCount;
+ }
+
+ private long getCompressedSize() {
+ return compressedSize;
+ }
+
+ @Override
+ public List<MergedColumn> columnsInOrder() {
+ return schema.getColumns()
+ .stream()
+ .map(columns::get)
+ .collect(Collectors.toList());
+ }
+
+ MutableMergedColumn getOrCreateColumn(ColumnDescriptor column) {
+ return columns.computeIfAbsent(column, desc -> new
MutableMergedColumn(desc, this::addCompressedBytes));
+ }
+
+ void addRowCount(long rowCount) {
+ recordCount += rowCount;
+ }
+
+ void addCompressedBytes(long size) {
+ compressedSize += size;
+ }
+
+ void merge(BlockMetaData blockMeta, PageReadStore group) throws
IOException {
+ for (Entry<ColumnDescriptor, ColumnChunkMetaData> col :
getColumnsInOrder(blockMeta, schema)) {
+
+ MutableMergedColumn column = getOrCreateColumn(col.getKey());
+ PageReader columnReader = group.getPageReader(col.getKey());
+
+ DictionaryPage dictPage = columnReader.readDictionaryPage();
+ Dictionary decodedDictionary = null;
+ if (dictPage != null) {
+ decodedDictionary =
dictPage.getEncoding().initDictionary(column.getColumnDesc(), dictPage);
+ }
+
+ //read all pages in this column chunk
+ DataPage data;
+ while ((data = columnReader.readPage()) != null) {
+ column.addPage(data, decodedDictionary);
+ }
+ }
+ addRowCount(blockMeta.getRowCount());
+ }
+ }
+
+ private static List<Entry<ColumnDescriptor, ColumnChunkMetaData>>
getColumnsInOrder(BlockMetaData blockMeta,
+
MessageType schema) {
+
+ return ParquetFileWriter.getColumnsInOrder(blockMeta, schema,
false).stream()
+ .map(c -> toEntry(schema, c))
+ .collect(Collectors.toList());
+ }
+
+ private static SimpleEntry<ColumnDescriptor, ColumnChunkMetaData>
toEntry(MessageType schema,
+
ColumnChunkMetaData column) {
+
+ return new SimpleEntry<>(
+ schema.getColumnDescription(column.getPath().toArray()),
+ column);
+ }
+
+ private static class ReadOnlyMergedBlock implements MergedBlock {
+ private final List<MergedColumn> columns;
+ private final long recordCount;
+
+ private ReadOnlyMergedBlock(List<MergedColumn> columns, long recordCount) {
+ this.columns = Collections.unmodifiableList(columns);
+ this.recordCount = recordCount;
+ }
+
+ @Override
+ public long rowCount() {
+ return recordCount;
+ }
+
+ @Override
+ public List<MergedColumn> columnsInOrder() {
+ return columns;
+ }
+
+ static ReadOnlyMergedBlock of(BlockMetaData blockMeta, PageReadStore
group, MessageType schema,
+ CompressionCodecFactory.BytesInputCompressor
compressor) {
+ List<MergedColumn> columns = new ArrayList<>();
+ for (Entry<ColumnDescriptor, ColumnChunkMetaData> col :
getColumnsInOrder(blockMeta, schema)) {
+
+ List<DataPage> pages = new ArrayList<>();
+ PageReader columnReader = group.getPageReader(col.getKey());
+
+ DictionaryPage dictPage = columnReader.readDictionaryPage();
+ if (dictPage != null) {
+ dictPage = getCompressedDictionary(dictPage, compressor);
+ }
+
+ //read all pages in this column chunk
+ DataPage data;
+ while ((data = columnReader.readPage()) != null) {
+
+ data = data.accept(new DataPage.Visitor<DataPage>() {
+ @Override
+ public DataPage visit(DataPageV1 pageV1) {
+
+ return new DataPageV1(compress(pageV1.getBytes(), compressor),
pageV1.getValueCount(),
+ pageV1.getUncompressedSize(),
pageV1.getFirstRowIndex().orElse(-1L),
+ pageV1.getIndexRowCount().orElse(-1), pageV1.getStatistics(),
pageV1.getRlEncoding(),
+ pageV1.getDlEncoding(), pageV1.getValueEncoding());
+ }
+
+ @Override
+ public DataPage visit(DataPageV2 pageV2) {
+
+ return DataPageV2.compressed(
+ pageV2.getRowCount(), pageV2.getNullCount(),
pageV2.getValueCount(),
+ pageV2.getRepetitionLevels(), pageV2.getDefinitionLevels(),
pageV2.getDataEncoding(),
+ compress(pageV2.getData(), compressor),
pageV2.getUncompressedSize(), pageV2.getStatistics());
+ }
+ });
+
+ pages.add(data);
+ }
+
+ ReadOnlyMergedColumn column = new ReadOnlyMergedColumn(pages,
dictPage, col.getKey(),
+ col.getValue().getValueCount(), compressor.getCodecName());
+
+ columns.add(column);
+ }
+ return new ReadOnlyMergedBlock(columns, blockMeta.getRowCount());
+ }
+ }
+
+ private static class ReadOnlyMergedColumn implements MergedColumn {
+ private final List<DataPage> pages;
+ private final DictionaryPage dictionary;
+ private final ColumnDescriptor columnDesc;
+ private final long valueCount;
+ private final CompressionCodecName codecName;
+
+ private ReadOnlyMergedColumn(List<DataPage> pages, DictionaryPage
dictionary, ColumnDescriptor columnDesc,
+ long valueCount, CompressionCodecName
codecName) {
+ this.pages = pages;
+ this.dictionary = dictionary;
+ this.columnDesc = columnDesc;
+ this.valueCount = valueCount;
+ this.codecName = codecName;
+ }
+
+ @Override
+ public long getValueCount() {
+ return valueCount;
+ }
+
+ @Override
+ public CompressionCodecName getCompression() {
+ return codecName;
+ }
+
+ @Override
+ public ColumnDescriptor getColumnDesc() {
+ return columnDesc;
+ }
+
+ @Override
+ public void writeDataPagesTo(ParquetFileWriter writer) {
+ pages.forEach(page -> writePageTo(page, writer));
+ }
+
+ @Override
+ public void writeDictionaryPageTo(ParquetFileWriter writer) throws
IOException {
+ if (dictionary != null) {
+ writer.writeDictionaryPage(dictionary);
+ }
+ }
+ }
+
+ private class MutableMergedColumn implements MergedColumn {
+
+ private final List<DataPage> pages = new ArrayList<>();
+ private final ColumnDescriptor columnDesc;
+ private final ValuesWriter newValuesWriter;
+ private final BiConsumer<ValuesReader, ValuesWriter> dataWriter;
+ private final Consumer<Long> compressedSizeAccumulator;
+
+ private long valueCount;
+
+ private MutableMergedColumn(ColumnDescriptor column, Consumer<Long>
compressedSizeAccumulator) {
+ this.columnDesc = column;
+ this.compressedSizeAccumulator = compressedSizeAccumulator;
+ this.newValuesWriter = parquetProperties.newValuesWriter(columnDesc);
+ this.dataWriter =
createWritingBridge(columnDesc.getPrimitiveType().getPrimitiveTypeName());
+ }
+
+ @Override
+ public long getValueCount() {
+ return valueCount;
+ }
+
+ @Override
+ public CompressionCodecName getCompression() {
+ return compressor.getCodecName();
+ }
+
+ @Override
+ public ColumnDescriptor getColumnDesc() {
+ return columnDesc;
+ }
+
+ @Override
+ public void writeDataPagesTo(ParquetFileWriter writer) {
+ pages.forEach(page -> writePageTo(page, writer));
+ }
+
+ @Override
+ public void writeDictionaryPageTo(ParquetFileWriter writer) throws
IOException {
+ DictionaryPage page = newValuesWriter.toDictPageAndClose();
+ if (page != null) {
+ writer.writeDictionaryPage(getCompressedDictionary(page, compressor));
+ newValuesWriter.resetDictionary();
+ }
+ }
+
+ void addPage(DataPage data, Dictionary pageDictionary) {
+ DataPage recodePage = recodePage(data, pageDictionary);
+ compressedSizeAccumulator.accept((long) recodePage.getCompressedSize());
+ valueCount += recodePage.getValueCount();
+ pages.add(recodePage);
+ }
+
+ DataPage recodePage(DataPage data, Dictionary pageDictionary) {
+ return data.accept(new DataPage.Visitor<DataPage>() {
+
+ @Override
+ public DataPage visit(DataPageV1 pageV1) {
+
+ try {
+ verifyDataEncoding(pageV1.getValueEncoding(), pageDictionary);
+
+ final byte[] originalBytes = pageV1.getBytes().toByteArray();
+
+ if (pageV1.getValueEncoding().usesDictionary()) {
+
+ ValuesReader rlReader =
pageV1.getRlEncoding().getValuesReader(columnDesc, REPETITION_LEVEL);
+ ValuesReader dlReader =
pageV1.getDlEncoding().getValuesReader(columnDesc, DEFINITION_LEVEL);
+ ValuesReader dataReader = pageV1.getValueEncoding()
+ .getDictionaryBasedValuesReader(columnDesc, VALUES,
pageDictionary);
+
+ ByteBufferInputStream input =
ByteBufferInputStream.wrap(ByteBuffer.wrap(originalBytes));
+
+ int startPos = Math.toIntExact(input.position());
+ rlReader.initFromPage(pageV1.getValueCount(), input);
+ dlReader.initFromPage(pageV1.getValueCount(), input);
+ int dlEndPos = Math.toIntExact(input.position());
+
+ dataReader.initFromPage(pageV1.getValueCount(), input);
+
+ int rowCount = 0;
+ for (int i = 0; i < pageV1.getValueCount(); i++) {
+ if (dlReader.readInteger() ==
columnDesc.getMaxDefinitionLevel())
+ dataWriter.accept(dataReader, newValuesWriter);
+
+ rowCount = rlReader.readInteger() == 0 ? rowCount + 1 :
rowCount;
+ }
+
+ BytesInput recodedBytes = BytesInput.concat(
+ BytesInput.from(originalBytes, startPos, dlEndPos - startPos),
newValuesWriter.getBytes()
+ );
+
+ Encoding valuesEncoding = newValuesWriter.getEncoding();
+ int uncompressedSize = Math.toIntExact(recodedBytes.size());
+ BytesInput compressedBytes = compress(recodedBytes);
+
+ newValuesWriter.reset();
+
+ long firstRowIndex = pageV1.getFirstRowIndex().orElse(-1L);
Review Comment:
We cannot simply copy `firstRowIndex` if pages are not from the 1st row
group in this MutableMergedBlock.
> Add merge blocks command to parquet-tools
> -----------------------------------------
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
> Issue Type: New Feature
> Components: parquet-mr
> Affects Versions: 1.10.0
> Reporter: Ekaterina Galieva
> Assignee: Ekaterina Galieva
> Priority: Major
> Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row
> groups, just places one after the other. Add API and command option to be
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown
> optimizations.
> Blocks are not divided to fit upper bound perfectly.
> This is an intentional performance optimization.
> This gives an opportunity to form new blocks by coping full content of
> smaller blocks by column, not by row.
> h6. Examples:
> # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
> # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126]
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)