[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744447#comment-17744447
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-----------------------------------------

wgtmac commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267547118


##########
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##########
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
           required = false)
   String codec;
 
+  @Parameter(
+    names = {"-m", "--merge-rowgroups"},
+    description = "<true/false>",
+    required = false)
+  boolean mergeRowGroups;
+
+  @Parameter(
+    names = {"-s", "--max-rowgroup-size"},
+    description = "<max size of the merged rowgroups>",

Review Comment:
   It would be good to say it is used together with `--merge-rowgroups=true` in 
the description.



##########
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##########
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
           required = false)
   String codec;
 
+  @Parameter(
+    names = {"-m", "--merge-rowgroups"},
+    description = "<true/false>",

Review Comment:
   Could you please add a brief description?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -751,6 +767,27 @@ public GroupConverter asGroupConverter() {
     }
   }
 
+  private void mergeRowGroups() throws IOException {
+    if (null == reader) {
+      return;
+    }
+
+    boolean v2EncodingHint = meta.getBlocks().stream()
+      .flatMap(b -> b.getColumns().stream())
+      .anyMatch(chunk -> {
+        EncodingStats stats = chunk.getEncodingStats();
+        return stats != null && stats.usesV2Pages();
+      });
+
+    List<ParquetFileReader> readers = new ArrayList<>();
+    do {
+      readers.add(reader);
+      initNextReader();
+    }
+    while(reader != null);
+    new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, 
maxRowGroupSize, writer);

Review Comment:
   I didn't review it in depth. Does it handle encryption or masking properties 
internally? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class RowGroupMerger {

Review Comment:
   Probably you need to relocate it into the `rewrite` package.



##########
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##########
@@ -108,6 +120,14 @@ private RewriteOptions buildOptionsOrFail() {
       builder.transform(codecName);
     }
 
+    if (mergeRowGroups) {
+      Preconditions.checkArgument(maxRowGroupSize > 0,
+        "If merge rowgroup is enabled, max rowgroups size should be 
specified");
+      Preconditions.checkArgument(null != codec,
+        "If merge rowgroup is enabled, new compression codec needs to be 
specified");
+      builder.enableRowGroupMerge();
+      builder.maxRowGroupSize(maxRowGroupSize);

Review Comment:
   What about use a single function? Like 
`builder.mergeRowGroups(maxRowGroupSize)`.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;

Review Comment:
   Please do not use import star.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class RowGroupMerger {

Review Comment:
   ```suggestion
   class RowGroupMerger {
   ```
   
   It would be good not to make it public for now.





> Add merge blocks command to parquet-tools
> -----------------------------------------
>
>                 Key: PARQUET-1381
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1381
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-mr
>    Affects Versions: 1.10.0
>            Reporter: Ekaterina Galieva
>            Assignee: Ekaterina Galieva
>            Priority: Major
>              Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to