This is an automated email from the ASF dual-hosted git repository. gabor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new 79e29d44b PARQUET-2230: Add a new rewrite command powered by ParquetRewriter (#1034) 79e29d44b is described below commit 79e29d44b7994bf817e4ef272dcfd54bf1c89618 Author: Gang Wu <ust...@gmail.com> AuthorDate: Mon Feb 27 15:43:29 2023 +0800 PARQUET-2230: Add a new rewrite command powered by ParquetRewriter (#1034) --- parquet-cli/README.md | 2 + .../src/main/java/org/apache/parquet/cli/Main.java | 2 + .../parquet/cli/commands/PruneColumnsCommand.java | 2 +- .../parquet/cli/commands/RewriteCommand.java | 131 +++++++++++++++++++++ .../parquet/cli/commands/RewriteCommandTest.java | 41 +++++++ 5 files changed, 177 insertions(+), 1 deletion(-) diff --git a/parquet-cli/README.md b/parquet-cli/README.md index 73e512a0d..b3324778a 100644 --- a/parquet-cli/README.md +++ b/parquet-cli/README.md @@ -117,6 +117,8 @@ Usage: parquet [options] [command] [command options] Check bloom filters for a Parquet column scan Scan all records from a file + rewrite + Rewrite one or more Parquet files to a new Parquet file Examples: diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java index d1da6ee00..5cb282e64 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java @@ -34,6 +34,7 @@ import org.apache.parquet.cli.commands.ConvertCSVCommand; import org.apache.parquet.cli.commands.ConvertCommand; import org.apache.parquet.cli.commands.ParquetMetadataCommand; import org.apache.parquet.cli.commands.PruneColumnsCommand; +import org.apache.parquet.cli.commands.RewriteCommand; import org.apache.parquet.cli.commands.ScanCommand; import org.apache.parquet.cli.commands.SchemaCommand; import org.apache.parquet.cli.commands.ShowBloomFilterCommand; @@ -103,6 +104,7 @@ public class Main extends Configured implements Tool { jc.addCommand("footer", new ShowFooterCommand(console)); jc.addCommand("bloom-filter", new ShowBloomFilterCommand(console)); jc.addCommand("scan", new ScanCommand(console)); + jc.addCommand("rewrite", new RewriteCommand(console)); } @Override diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java index 3c238f32a..7a832651b 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java @@ -52,7 +52,7 @@ public class PruneColumnsCommand extends BaseCommand { @Parameter( names = {"-c", "--columns"}, - description = "<columns to be replaced with masked value>", + description = "<columns to be pruned>", required = true) List<String> cols; diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java new file mode 100644 index 000000000..b507b0def --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.rewrite.MaskMode; +import org.apache.parquet.hadoop.rewrite.ParquetRewriter; +import org.apache.parquet.hadoop.rewrite.RewriteOptions; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Parameters(commandDescription = "Rewrite one or more Parquet files to a new Parquet file") +public class RewriteCommand extends BaseCommand { + + @Parameter( + names = {"-i", "--input"}, + description = "<comma-separated text of input parquet file paths>", + required = true) + List<String> inputs; + @Parameter( + names = {"-o", "--output"}, + description = "<output parquet file path>", + required = true) + String output; + @Parameter( + names = {"--mask-mode"}, + description = "<mask mode: nullify>", + required = false) + String maskMode; + @Parameter( + names = {"--mask-columns"}, + description = "<columns to be replaced with masked value>", + required = false) + List<String> maskColumns; + + @Parameter( + names = {"--prune-columns"}, + description = "<columns to be replaced with masked value>", + required = false) + List<String> pruneColumns; + + @Parameter( + names = {"-c", "--compression-codec"}, + description = "<new compression codec", + required = false) + String codec; + + public RewriteCommand(Logger console) { + super(console); + } + + private RewriteOptions buildOptionsOrFail() { + Preconditions.checkArgument(inputs != null && !inputs.isEmpty() && output != null, + "Both input and output parquet file paths are required."); + + List<Path> inputPaths = new ArrayList<>(); + for (String input : inputs) { + inputPaths.add(new Path(input)); + } + Path outputPath = new Path(output); + + // The builder below takes the job to validate all input parameters. + RewriteOptions.Builder builder = new RewriteOptions.Builder(getConf(), inputPaths, outputPath); + + // Mask columns if specified. + if (maskMode != null && maskMode.equals("nullify") && maskColumns != null && !maskColumns.isEmpty()) { + Map<String, MaskMode> maskModeMap = new HashMap<>(); + for (String maskColumn : maskColumns) { + maskModeMap.put(maskColumn, MaskMode.NULLIFY); + } + builder.mask(maskModeMap); + } + + // Prune columns if specified. + if (pruneColumns != null && !pruneColumns.isEmpty()) { + builder.prune(pruneColumns); + } + + if (codec != null) { + CompressionCodecName codecName = CompressionCodecName.valueOf(codec); + builder.transform(codecName); + } + + return builder.build(); + } + + @Override + @SuppressWarnings("unchecked") + public int run() throws IOException { + RewriteOptions options = buildOptionsOrFail(); + ParquetRewriter rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + return 0; + } + + @Override + public List<String> getExamples() { + return Lists.newArrayList( + "# Rewrite one or more Parquet files to a new Parquet file", + "-i input.parquet -o output.parquet --mask-mode nullify --mask-columns col1_name" + ); + } +} diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java new file mode 100644 index 000000000..2d87f8fd6 --- /dev/null +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli.commands; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +public class RewriteCommandTest extends ParquetFileTest { + @Test + public void testRewriteCommand() throws IOException { + File file = parquetFile(); + RewriteCommand command = new RewriteCommand(createLogger()); + command.inputs = Arrays.asList(file.getAbsolutePath()); + File output = new File(getTempFolder(), "converted.parquet"); + command.output = output.getAbsolutePath(); + command.setConf(new Configuration()); + Assert.assertEquals(0, command.run()); + Assert.assertTrue(output.exists()); + } +}