This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 79e29d44b PARQUET-2230: Add a new rewrite command powered by 
ParquetRewriter (#1034)
79e29d44b is described below

commit 79e29d44b7994bf817e4ef272dcfd54bf1c89618
Author: Gang Wu <ust...@gmail.com>
AuthorDate: Mon Feb 27 15:43:29 2023 +0800

    PARQUET-2230: Add a new rewrite command powered by ParquetRewriter (#1034)
---
 parquet-cli/README.md                              |   2 +
 .../src/main/java/org/apache/parquet/cli/Main.java |   2 +
 .../parquet/cli/commands/PruneColumnsCommand.java  |   2 +-
 .../parquet/cli/commands/RewriteCommand.java       | 131 +++++++++++++++++++++
 .../parquet/cli/commands/RewriteCommandTest.java   |  41 +++++++
 5 files changed, 177 insertions(+), 1 deletion(-)

diff --git a/parquet-cli/README.md b/parquet-cli/README.md
index 73e512a0d..b3324778a 100644
--- a/parquet-cli/README.md
+++ b/parquet-cli/README.md
@@ -117,6 +117,8 @@ Usage: parquet [options] [command] [command options]
         Check bloom filters for a Parquet column
     scan
         Scan all records from a file
+    rewrite
+        Rewrite one or more Parquet files to a new Parquet file
 
   Examples:
 
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index d1da6ee00..5cb282e64 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -34,6 +34,7 @@ import org.apache.parquet.cli.commands.ConvertCSVCommand;
 import org.apache.parquet.cli.commands.ConvertCommand;
 import org.apache.parquet.cli.commands.ParquetMetadataCommand;
 import org.apache.parquet.cli.commands.PruneColumnsCommand;
+import org.apache.parquet.cli.commands.RewriteCommand;
 import org.apache.parquet.cli.commands.ScanCommand;
 import org.apache.parquet.cli.commands.SchemaCommand;
 import org.apache.parquet.cli.commands.ShowBloomFilterCommand;
@@ -103,6 +104,7 @@ public class Main extends Configured implements Tool {
     jc.addCommand("footer", new ShowFooterCommand(console));
     jc.addCommand("bloom-filter", new ShowBloomFilterCommand(console));
     jc.addCommand("scan", new ScanCommand(console));
+    jc.addCommand("rewrite", new RewriteCommand(console));
   }
 
   @Override
diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
index 3c238f32a..7a832651b 100644
--- 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
+++ 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
@@ -52,7 +52,7 @@ public class PruneColumnsCommand extends BaseCommand {
 
   @Parameter(
     names = {"-c", "--columns"},
-    description = "<columns to be replaced with masked value>",
+    description = "<columns to be pruned>",
     required = true)
   List<String> cols;
 
diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java
new file mode 100644
index 000000000..b507b0def
--- /dev/null
+++ 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.rewrite.MaskMode;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
+import org.apache.parquet.hadoop.rewrite.RewriteOptions;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Parameters(commandDescription = "Rewrite one or more Parquet files to a new 
Parquet file")
+public class RewriteCommand extends BaseCommand {
+
+  @Parameter(
+          names = {"-i", "--input"},
+          description = "<comma-separated text of input parquet file paths>",
+          required = true)
+  List<String> inputs;
+  @Parameter(
+          names = {"-o", "--output"},
+          description = "<output parquet file path>",
+          required = true)
+  String output;
+  @Parameter(
+          names = {"--mask-mode"},
+          description = "<mask mode: nullify>",
+          required = false)
+  String maskMode;
+  @Parameter(
+          names = {"--mask-columns"},
+          description = "<columns to be replaced with masked value>",
+          required = false)
+  List<String> maskColumns;
+
+  @Parameter(
+          names = {"--prune-columns"},
+          description = "<columns to be replaced with masked value>",
+          required = false)
+  List<String> pruneColumns;
+
+  @Parameter(
+          names = {"-c", "--compression-codec"},
+          description = "<new compression codec",
+          required = false)
+  String codec;
+
+  public RewriteCommand(Logger console) {
+    super(console);
+  }
+
+  private RewriteOptions buildOptionsOrFail() {
+    Preconditions.checkArgument(inputs != null && !inputs.isEmpty() && output 
!= null,
+            "Both input and output parquet file paths are required.");
+
+    List<Path> inputPaths = new ArrayList<>();
+    for (String input : inputs) {
+      inputPaths.add(new Path(input));
+    }
+    Path outputPath = new Path(output);
+
+    // The builder below takes the job to validate all input parameters.
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(getConf(), 
inputPaths, outputPath);
+
+    // Mask columns if specified.
+    if (maskMode != null && maskMode.equals("nullify") && maskColumns != null 
&& !maskColumns.isEmpty()) {
+      Map<String, MaskMode> maskModeMap = new HashMap<>();
+      for (String maskColumn : maskColumns) {
+        maskModeMap.put(maskColumn, MaskMode.NULLIFY);
+      }
+      builder.mask(maskModeMap);
+    }
+
+    // Prune columns if specified.
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      builder.prune(pruneColumns);
+    }
+
+    if (codec != null) {
+      CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+      builder.transform(codecName);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    RewriteOptions options = buildOptionsOrFail();
+    ParquetRewriter rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+            "# Rewrite one or more Parquet files to a new Parquet file",
+            "-i input.parquet -o output.parquet --mask-mode nullify 
--mask-columns col1_name"
+    );
+  }
+}
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java
 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java
new file mode 100644
index 000000000..2d87f8fd6
--- /dev/null
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class RewriteCommandTest extends ParquetFileTest {
+  @Test
+  public void testRewriteCommand() throws IOException {
+    File file = parquetFile();
+    RewriteCommand command = new RewriteCommand(createLogger());
+    command.inputs = Arrays.asList(file.getAbsolutePath());
+    File output = new File(getTempFolder(), "converted.parquet");
+    command.output = output.getAbsolutePath();
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+    Assert.assertTrue(output.exists());
+  }
+}

Reply via email to