This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 79e29d44b PARQUET-2230: Add a new rewrite command powered by
ParquetRewriter (#1034)
79e29d44b is described below
commit 79e29d44b7994bf817e4ef272dcfd54bf1c89618
Author: Gang Wu <[email protected]>
AuthorDate: Mon Feb 27 15:43:29 2023 +0800
PARQUET-2230: Add a new rewrite command powered by ParquetRewriter (#1034)
---
parquet-cli/README.md | 2 +
.../src/main/java/org/apache/parquet/cli/Main.java | 2 +
.../parquet/cli/commands/PruneColumnsCommand.java | 2 +-
.../parquet/cli/commands/RewriteCommand.java | 131 +++++++++++++++++++++
.../parquet/cli/commands/RewriteCommandTest.java | 41 +++++++
5 files changed, 177 insertions(+), 1 deletion(-)
diff --git a/parquet-cli/README.md b/parquet-cli/README.md
index 73e512a0d..b3324778a 100644
--- a/parquet-cli/README.md
+++ b/parquet-cli/README.md
@@ -117,6 +117,8 @@ Usage: parquet [options] [command] [command options]
Check bloom filters for a Parquet column
scan
Scan all records from a file
+ rewrite
+ Rewrite one or more Parquet files to a new Parquet file
Examples:
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index d1da6ee00..5cb282e64 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -34,6 +34,7 @@ import org.apache.parquet.cli.commands.ConvertCSVCommand;
import org.apache.parquet.cli.commands.ConvertCommand;
import org.apache.parquet.cli.commands.ParquetMetadataCommand;
import org.apache.parquet.cli.commands.PruneColumnsCommand;
+import org.apache.parquet.cli.commands.RewriteCommand;
import org.apache.parquet.cli.commands.ScanCommand;
import org.apache.parquet.cli.commands.SchemaCommand;
import org.apache.parquet.cli.commands.ShowBloomFilterCommand;
@@ -103,6 +104,7 @@ public class Main extends Configured implements Tool {
jc.addCommand("footer", new ShowFooterCommand(console));
jc.addCommand("bloom-filter", new ShowBloomFilterCommand(console));
jc.addCommand("scan", new ScanCommand(console));
+ jc.addCommand("rewrite", new RewriteCommand(console));
}
@Override
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
index 3c238f32a..7a832651b 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
@@ -52,7 +52,7 @@ public class PruneColumnsCommand extends BaseCommand {
@Parameter(
names = {"-c", "--columns"},
- description = "<columns to be replaced with masked value>",
+ description = "<columns to be pruned>",
required = true)
List<String> cols;
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java
new file mode 100644
index 000000000..b507b0def
--- /dev/null
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.rewrite.MaskMode;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
+import org.apache.parquet.hadoop.rewrite.RewriteOptions;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Parameters(commandDescription = "Rewrite one or more Parquet files to a new
Parquet file")
+public class RewriteCommand extends BaseCommand {
+
+ @Parameter(
+ names = {"-i", "--input"},
+ description = "<comma-separated text of input parquet file paths>",
+ required = true)
+ List<String> inputs;
+ @Parameter(
+ names = {"-o", "--output"},
+ description = "<output parquet file path>",
+ required = true)
+ String output;
+ @Parameter(
+ names = {"--mask-mode"},
+ description = "<mask mode: nullify>",
+ required = false)
+ String maskMode;
+ @Parameter(
+ names = {"--mask-columns"},
+ description = "<columns to be replaced with masked value>",
+ required = false)
+ List<String> maskColumns;
+
+ @Parameter(
+ names = {"--prune-columns"},
+ description = "<columns to be replaced with masked value>",
+ required = false)
+ List<String> pruneColumns;
+
+ @Parameter(
+ names = {"-c", "--compression-codec"},
+ description = "<new compression codec",
+ required = false)
+ String codec;
+
+ public RewriteCommand(Logger console) {
+ super(console);
+ }
+
+ private RewriteOptions buildOptionsOrFail() {
+ Preconditions.checkArgument(inputs != null && !inputs.isEmpty() && output
!= null,
+ "Both input and output parquet file paths are required.");
+
+ List<Path> inputPaths = new ArrayList<>();
+ for (String input : inputs) {
+ inputPaths.add(new Path(input));
+ }
+ Path outputPath = new Path(output);
+
+ // The builder below takes the job to validate all input parameters.
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(getConf(),
inputPaths, outputPath);
+
+ // Mask columns if specified.
+ if (maskMode != null && maskMode.equals("nullify") && maskColumns != null
&& !maskColumns.isEmpty()) {
+ Map<String, MaskMode> maskModeMap = new HashMap<>();
+ for (String maskColumn : maskColumns) {
+ maskModeMap.put(maskColumn, MaskMode.NULLIFY);
+ }
+ builder.mask(maskModeMap);
+ }
+
+ // Prune columns if specified.
+ if (pruneColumns != null && !pruneColumns.isEmpty()) {
+ builder.prune(pruneColumns);
+ }
+
+ if (codec != null) {
+ CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+ builder.transform(codecName);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ RewriteOptions options = buildOptionsOrFail();
+ ParquetRewriter rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Rewrite one or more Parquet files to a new Parquet file",
+ "-i input.parquet -o output.parquet --mask-mode nullify
--mask-columns col1_name"
+ );
+ }
+}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java
new file mode 100644
index 000000000..2d87f8fd6
--- /dev/null
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class RewriteCommandTest extends ParquetFileTest {
+ @Test
+ public void testRewriteCommand() throws IOException {
+ File file = parquetFile();
+ RewriteCommand command = new RewriteCommand(createLogger());
+ command.inputs = Arrays.asList(file.getAbsolutePath());
+ File output = new File(getTempFolder(), "converted.parquet");
+ command.output = output.getAbsolutePath();
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ Assert.assertTrue(output.exists());
+ }
+}