[
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744447#comment-17744447
]
ASF GitHub Bot commented on PARQUET-1381:
-----------------------------------------
wgtmac commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267547118
##########
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##########
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
required = false)
String codec;
+ @Parameter(
+ names = {"-m", "--merge-rowgroups"},
+ description = "<true/false>",
+ required = false)
+ boolean mergeRowGroups;
+
+ @Parameter(
+ names = {"-s", "--max-rowgroup-size"},
+ description = "<max size of the merged rowgroups>",
Review Comment:
It would be good to say it is used together with `--merge-rowgroups=true` in
the description.
##########
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##########
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
required = false)
String codec;
+ @Parameter(
+ names = {"-m", "--merge-rowgroups"},
+ description = "<true/false>",
Review Comment:
Could you please add a brief description?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -751,6 +767,27 @@ public GroupConverter asGroupConverter() {
}
}
+ private void mergeRowGroups() throws IOException {
+ if (null == reader) {
+ return;
+ }
+
+ boolean v2EncodingHint = meta.getBlocks().stream()
+ .flatMap(b -> b.getColumns().stream())
+ .anyMatch(chunk -> {
+ EncodingStats stats = chunk.getEncodingStats();
+ return stats != null && stats.usesV2Pages();
+ });
+
+ List<ParquetFileReader> readers = new ArrayList<>();
+ do {
+ readers.add(reader);
+ initNextReader();
+ }
+ while(reader != null);
+ new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers,
maxRowGroupSize, writer);
Review Comment:
I didn't review it in depth. Does it handle encryption or masking properties
internally?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class RowGroupMerger {
Review Comment:
Probably you need to relocate it into the `rewrite` package.
##########
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##########
@@ -108,6 +120,14 @@ private RewriteOptions buildOptionsOrFail() {
builder.transform(codecName);
}
+ if (mergeRowGroups) {
+ Preconditions.checkArgument(maxRowGroupSize > 0,
+ "If merge rowgroup is enabled, max rowgroups size should be
specified");
+ Preconditions.checkArgument(null != codec,
+ "If merge rowgroup is enabled, new compression codec needs to be
specified");
+ builder.enableRowGroupMerge();
+ builder.maxRowGroupSize(maxRowGroupSize);
Review Comment:
What about use a single function? Like
`builder.mergeRowGroups(maxRowGroupSize)`.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
Review Comment:
Please do not use import star.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class RowGroupMerger {
Review Comment:
```suggestion
class RowGroupMerger {
```
It would be good not to make it public for now.
> Add merge blocks command to parquet-tools
> -----------------------------------------
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
> Issue Type: New Feature
> Components: parquet-mr
> Affects Versions: 1.10.0
> Reporter: Ekaterina Galieva
> Assignee: Ekaterina Galieva
> Priority: Major
> Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row
> groups, just places one after the other. Add API and command option to be
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown
> optimizations.
> Blocks are not divided to fit upper bound perfectly.
> This is an intentional performance optimization.
> This gives an opportunity to form new blocks by coping full content of
> smaller blocks by column, not by row.
> h6. Examples:
> # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
> # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126]
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)