This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a78a0166 PARQUET-2380: Decouple rewriter from Hadoop (#1195)
1a78a0166 is described below

commit 1a78a0166c748d81b7f19c922023be1e7dffc7b6
Author: Atour <[email protected]>
AuthorDate: Tue Nov 21 06:26:03 2023 +0100

    PARQUET-2380: Decouple rewriter from Hadoop (#1195)
    
    Extracts repeated code in the updated test fixture and restores deleted 
method japicmp failed to detect.
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    |  37 +++---
 .../parquet/hadoop/rewrite/RewriteOptions.java     | 136 ++++++++++++++++++---
 .../parquet/hadoop/util/HadoopInputFile.java       |   8 ++
 .../parquet/hadoop/util/HadoopOutputFile.java      |  10 +-
 .../hadoop/rewrite/ParquetRewriterTest.java        |  70 +++++++----
 5 files changed, 205 insertions(+), 56 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 004a1d135..0de0009f5 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -19,8 +19,7 @@
 package org.apache.parquet.hadoop.rewrite;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -34,6 +33,7 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.crypto.AesCipher;
 import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
 import org.apache.parquet.crypto.InternalFileEncryptor;
@@ -54,10 +54,10 @@ import 
org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
 import org.apache.parquet.hadoop.util.HadoopCodecs;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
@@ -96,7 +96,7 @@ public class ParquetRewriter implements Closeable {
   private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
   private final byte[] pageBuffer = new byte[pageBufferSize];
   // Configurations for the new file
-  private CompressionCodecName newCodecName = null;
+  private final CompressionCodecName newCodecName;
   private Map<ColumnPath, MaskMode> maskColumns = null;
   private Set<ColumnPath> encryptColumns = null;
   private boolean encryptMode = false;
@@ -122,11 +122,11 @@ public class ParquetRewriter implements Closeable {
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
-    Configuration conf = options.getConf();
-    Path outPath = options.getOutputFile();
-    openInputFiles(options.getInputFiles(), conf);
+    ParquetConfiguration conf = options.getParquetConfiguration();
+    OutputFile out = options.getParquetOutputFile();
+    openInputFiles(options.getParquetInputFiles(), conf);
     LOG.info("Start rewriting {} input file(s) {} to {}",
-      inputFiles.size(), options.getInputFiles(), outPath);
+      inputFiles.size(), options.getParquetInputFiles(), out);
 
     // Init reader of the first input file
     initNextReader();
@@ -165,7 +165,7 @@ public class ParquetRewriter implements Closeable {
     this.indexCacheStrategy = options.getIndexCacheStrategy();
 
     ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
-    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), 
schema, writerMode,
+    writer = new ParquetFileWriter(out, schema, writerMode,
             DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, 
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
             DEFAULT_STATISTICS_TRUNCATE_LENGTH, 
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
             options.getFileEncryptionProperties());
@@ -201,13 +201,13 @@ public class ParquetRewriter implements Closeable {
   }
 
   // Open all input files to validate their schemas are compatible to merge
-  private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+  private void openInputFiles(List<InputFile> inputFiles, ParquetConfiguration 
conf) {
     Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"No input files");
 
-    for (Path inputFile : inputFiles) {
+    for (InputFile inputFile : inputFiles) {
       try {
         TransParquetFileReader reader = new TransParquetFileReader(
-                HadoopInputFile.fromPath(inputFile, conf), 
HadoopReadOptions.builder(conf).build());
+                inputFile, ParquetReadOptions.builder(conf).build());
         MessageType inputFileSchema = 
reader.getFooter().getFileMetaData().getSchema();
         if (this.schema == null) {
           this.schema = inputFileSchema;
@@ -623,8 +623,7 @@ public class ParquetRewriter implements Closeable {
     List<Type> fields = schema.getFields();
     List<String> currentPath = new ArrayList<>();
     List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, 
prunePaths);
-    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
-    return newSchema;
+    return new MessageType(schema.getName(), prunedFields);
   }
 
   private List<Type> pruneColumnsInFields(List<Type> fields, List<String> 
currentPath, Set<ColumnPath> prunePaths) {
@@ -797,10 +796,10 @@ public class ParquetRewriter implements Closeable {
     private final BlockCipher.Encryptor metaDataEncryptor;
     private final byte[] fileAAD;
 
-    private byte[] dataPageHeaderAAD;
-    private byte[] dataPageAAD;
-    private byte[] dictPageHeaderAAD;
-    private byte[] dictPageAAD;
+    private final byte[] dataPageHeaderAAD;
+    private final byte[] dataPageAAD;
+    private final byte[] dictPageHeaderAAD;
+    private final byte[] dictPageAAD;
 
     public ColumnChunkEncryptorRunTime(InternalFileEncryptor fileEncryptor,
                                        ColumnChunkMetaData chunk,
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index 5bdc8d590..abe57e02d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -21,22 +21,31 @@ package org.apache.parquet.hadoop.rewrite;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.hadoop.IndexCache;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * A set of options to create a ParquetRewriter.
  */
 public class RewriteOptions {
 
-  private final Configuration conf;
-  private final List<Path> inputFiles;
-  private final Path outputFile;
+  private final ParquetConfiguration conf;
+  private final List<InputFile> inputFiles;
+  private final OutputFile outputFile;
   private final List<String> pruneColumns;
   private final CompressionCodecName newCodecName;
   private final Map<String, MaskMode> maskColumns;
@@ -44,9 +53,9 @@ public class RewriteOptions {
   private final FileEncryptionProperties fileEncryptionProperties;
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
-  private RewriteOptions(Configuration conf,
-                         List<Path> inputFiles,
-                         Path outputFile,
+  private RewriteOptions(ParquetConfiguration conf,
+                         List<InputFile> inputFiles,
+                         OutputFile outputFile,
                          List<String> pruneColumns,
                          CompressionCodecName newCodecName,
                          Map<String, MaskMode> maskColumns,
@@ -64,15 +73,70 @@ public class RewriteOptions {
     this.indexCacheStrategy = indexCacheStrategy;
   }
 
+  /**
+   * Gets the {@link Configuration} part of the rewrite options.
+   *
+   * @return the associated {@link Configuration}
+   */
   public Configuration getConf() {
+    return ConfigurationUtil.createHadoopConfiguration(conf);
+  }
+
+  /**
+   * Gets the {@link ParquetConfiguration} part of the rewrite options.
+   *
+   * @return the associated {@link ParquetConfiguration}
+   */
+  public ParquetConfiguration getParquetConfiguration() {
     return conf;
   }
 
+  /**
+   * Gets the input {@link Path}s for the rewrite if they exist for all input 
files,
+   * otherwise throws a {@link RuntimeException}.
+   *
+   * @return a {@link List} of the associated input {@link Path}s
+   */
   public List<Path> getInputFiles() {
+    return inputFiles.stream().map(f -> {
+      if (f instanceof HadoopOutputFile) {
+        HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) f;
+        return new Path(hadoopOutputFile.getPath());
+      } else {
+        throw new RuntimeException("The input files do not all have an 
associated Hadoop Path.");
+      }
+    }).collect(Collectors.toList());
+  }
+
+  /**
+   * Gets the {@link InputFile}s for the rewrite.
+   *
+   * @return a {@link List} of the associated {@link InputFile}s
+   */
+  public List<InputFile> getParquetInputFiles() {
     return inputFiles;
   }
 
+  /**
+   * Get the {@link Path} for the rewrite if it exists, otherwise throws a 
{@link RuntimeException}.
+   *
+   * @return the associated {@link Path} if it exists
+   */
   public Path getOutputFile() {
+    if (outputFile instanceof HadoopOutputFile) {
+      HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) outputFile;
+      return new Path(hadoopOutputFile.getPath());
+    } else {
+      throw new RuntimeException("The output file does not have an associated 
Hadoop Path.");
+    }
+  }
+
+  /**
+   * Get the {@link OutputFile} for the rewrite.
+   *
+   * @return the associated {@link OutputFile}
+   */
+  public OutputFile getParquetOutputFile() {
     return outputFile;
   }
 
@@ -102,9 +166,9 @@ public class RewriteOptions {
 
   // Builder to create a RewriterOptions.
   public static class Builder {
-    private final Configuration conf;
-    private final List<Path> inputFiles;
-    private final Path outputFile;
+    private final ParquetConfiguration conf;
+    private final List<InputFile> inputFiles;
+    private final OutputFile outputFile;
     private List<String> pruneColumns;
     private CompressionCodecName newCodecName;
     private Map<String, MaskMode> maskColumns;
@@ -120,9 +184,18 @@ public class RewriteOptions {
      * @param outputFile output file path to rewrite to
      */
     public Builder(Configuration conf, Path inputFile, Path outputFile) {
-      this.conf = conf;
-      this.inputFiles = Arrays.asList(inputFile);
-      this.outputFile = outputFile;
+      this(new HadoopParquetConfiguration(conf), 
HadoopInputFile.fromPathUnchecked(inputFile, conf), 
HadoopOutputFile.fromPathUnchecked(outputFile, conf));
+    }
+
+    /**
+     * Create a builder to create a RewriterOptions.
+     *
+     * @param conf       configuration for reading from input files and 
writing to output file
+     * @param inputFile  input file to read from
+     * @param outputFile output file to rewrite to
+     */
+    public Builder(ParquetConfiguration conf, InputFile inputFile, OutputFile 
outputFile) {
+      this(conf, Collections.singletonList(inputFile), outputFile);
     }
 
     /**
@@ -141,6 +214,30 @@ public class RewriteOptions {
      * @param outputFile output file path to rewrite to
      */
     public Builder(Configuration conf, List<Path> inputFiles, Path outputFile) 
{
+      this.conf = new HadoopParquetConfiguration(conf);
+      this.inputFiles = new ArrayList<>(inputFiles.size());
+      for (Path inputFile : inputFiles) {
+        this.inputFiles.add(HadoopInputFile.fromPathUnchecked(inputFile, 
conf));
+      }
+      this.outputFile = HadoopOutputFile.fromPathUnchecked(outputFile, conf);
+    }
+
+    /**
+     * Create a builder to create a RewriterOptions.
+     * <p>
+     * Please note that if merging more than one file, the schema of all files 
must be the same.
+     * Otherwise, the rewrite will fail.
+     * <p>
+     * The rewrite will keep original row groups from all input files. This 
may not be optimal
+     * if row groups are very small and will not solve small file problems. 
Instead, it will
+     * make it worse to have a large file footer in the output file.
+     * TODO: support rewrite by record to break the original row groups into 
reasonable ones.
+     *
+     * @param conf       configuration for reading from input files and 
writing to output file
+     * @param inputFiles list of input file paths to read from
+     * @param outputFile output file path to rewrite to
+     */
+    public Builder(ParquetConfiguration conf, List<InputFile> inputFiles, 
OutputFile outputFile) {
       this.conf = conf;
       this.inputFiles = inputFiles;
       this.outputFile = outputFile;
@@ -218,7 +315,18 @@ public class RewriteOptions {
      * @return self
      */
     public Builder addInputFile(Path path) {
-      this.inputFiles.add(path);
+      this.inputFiles.add(HadoopInputFile.fromPathUnchecked(path, 
ConfigurationUtil.createHadoopConfiguration(conf)));
+      return this;
+    }
+
+    /**
+     * Add an input file to read from.
+     *
+     * @param inputFile input file to read from
+     * @return self
+     */
+    public Builder addInputFile(InputFile inputFile) {
+      this.inputFiles.add(inputFile);
       return this;
     }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
index ad26430ba..5e799bfa8 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -39,6 +39,14 @@ public class HadoopInputFile implements InputFile {
     return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
   }
 
+  public static HadoopInputFile fromPathUnchecked(Path path, Configuration 
conf) {
+    try {
+      return fromPath(path, conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
       throws IOException {
     FileSystem fs = stat.getPath().getFileSystem(conf);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
index 30ac50e9d..defe66620 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -54,11 +54,19 @@ public class HadoopOutputFile implements OutputFile {
   private final Configuration conf;
 
   public static HadoopOutputFile fromPath(Path path, Configuration conf)
-      throws IOException {
+    throws IOException {
     FileSystem fs = path.getFileSystem(conf);
     return new HadoopOutputFile(fs, fs.makeQualified(path), conf);
   }
 
+  public static HadoopOutputFile fromPathUnchecked(Path path, Configuration 
conf) {
+    try {
+      return fromPath(path, conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
     this.fs = fs;
     this.path = path;
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index ab25566a9..e66486531 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -27,6 +27,8 @@ import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.conf.PlainParquetConfiguration;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.crypto.ParquetCipher;
@@ -52,11 +54,13 @@ import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReade
 import org.apache.parquet.hadoop.util.EncDecProperties;
 import org.apache.parquet.hadoop.util.EncryptionTestFile;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.TestFileBuilder;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
@@ -99,28 +103,40 @@ public class ParquetRewriterTest {
 
   private final int numRecord = 100000;
   private final Configuration conf = new Configuration();
+  private final ParquetConfiguration parquetConf = new 
PlainParquetConfiguration();
   private final ParquetProperties.WriterVersion writerVersion;
   private final IndexCache.CacheStrategy indexCacheStrategy;
+  private final boolean usingHadoop;
 
   private List<EncryptionTestFile> inputFiles = null;
   private String outputFile = null;
   private ParquetRewriter rewriter = null;
 
-  @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = 
{1}")
+  @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = 
{1}, UsingHadoop = {2}")
   public static Object[][] parameters() {
-    return new Object[][] {{"v1", "NONE"}, {"v1", "PREFETCH_BLOCK"}, {"v2", 
"NONE"}, {"v2", "PREFETCH_BLOCK"}};
-  }
-
-  public ParquetRewriterTest(String writerVersion, String indexCacheStrategy) {
+    return new Object[][] {
+      {"v1", "NONE", true},
+      {"v1", "PREFETCH_BLOCK", true},
+      {"v2", "NONE", true},
+      {"v2", "PREFETCH_BLOCK", true},
+      {"v1", "NONE", false},
+      {"v1", "PREFETCH_BLOCK", false},
+      {"v2", "NONE", false},
+      {"v2", "PREFETCH_BLOCK", false}
+    };
+  }
+
+  public ParquetRewriterTest(String writerVersion, String indexCacheStrategy, 
boolean usingHadoop) {
     this.writerVersion = 
ParquetProperties.WriterVersion.fromString(writerVersion);
     this.indexCacheStrategy = 
IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
+    this.usingHadoop = usingHadoop;
   }
 
   private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) 
throws Exception {
-    Path outputPath = new Path(outputFile);
-    List<String> pruneColumns = Arrays.asList("Gender");
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+    List<String> pruneColumns = Collections.singletonList("Gender");
     CompressionCodecName newCodec = CompressionCodecName.ZSTD;
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
     RewriteOptions options =
       
builder.prune(pruneColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
 
@@ -177,12 +193,12 @@ public class ParquetRewriterTest {
   }
 
   private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws 
Exception {
-    Path outputPath = new Path(outputFile);
-    List<String> pruneColumns = Arrays.asList("Gender");
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+    List<String> pruneColumns = Collections.singletonList("Gender");
     Map<String, MaskMode> maskColumns = new HashMap<>();
     maskColumns.put("Links.Forward", MaskMode.NULLIFY);
     CompressionCodecName newCodec = CompressionCodecName.ZSTD;
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
     RewriteOptions options =
       
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
 
@@ -233,11 +249,10 @@ public class ParquetRewriterTest {
   }
 
   private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws 
Exception {
-    Path outputPath = new Path(outputFile);
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
 
     // Prune
-    List<String> pruneColumns = Arrays.asList("Gender");
+    List<String> pruneColumns = Collections.singletonList("Gender");
     builder.prune(pruneColumns);
 
     // Translate codec
@@ -314,8 +329,7 @@ public class ParquetRewriterTest {
 
     inputFiles = inputPaths.stream().map(p -> new 
EncryptionTestFile(p.toString(), null)).collect(Collectors.toList());
 
-    Path outputPath = new Path(outputFile);
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
 
     Map<String, MaskMode> maskCols = Maps.newHashMap();
     maskCols.put("location.lat", MaskMode.NULLIFY);
@@ -380,8 +394,9 @@ public class ParquetRewriterTest {
     FileEncryptionProperties fileEncryptionProperties = 
EncDecProperties.getFileEncryptionProperties(
             encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
 
-    Path outputPath = new Path(outputFile);
-    RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, 
outputPath)
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+    RewriteOptions options = builder
       .mask(maskColumns)
       .transform(CompressionCodecName.ZSTD)
       .encrypt(Arrays.asList(encryptColumns))
@@ -456,8 +471,7 @@ public class ParquetRewriterTest {
     for (EncryptionTestFile inputFile : inputFiles) {
       inputPaths.add(new Path(inputFile.getFileName()));
     }
-    Path outputPath = new Path(outputFile);
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
     RewriteOptions options = 
builder.indexCacheStrategy(indexCacheStrategy).build();
 
     rewriter = new ParquetRewriter(options);
@@ -524,8 +538,7 @@ public class ParquetRewriterTest {
     for (EncryptionTestFile inputFile : inputFiles) {
       inputPaths.add(new Path(inputFile.getFileName()));
     }
-    Path outputPath = new Path(outputFile);
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
     RewriteOptions options = 
builder.indexCacheStrategy(indexCacheStrategy).build();
 
     // This should throw an exception because the schemas are different
@@ -934,6 +947,19 @@ public class ParquetRewriterTest {
     return allBloomFilters;
   }
 
+  private RewriteOptions.Builder createBuilder(List<Path> inputPaths) throws 
IOException {
+    RewriteOptions.Builder builder;
+    if (usingHadoop) {
+      Path outputPath = new Path(outputFile);
+      builder = new RewriteOptions.Builder(conf, inputPaths, outputPath);
+    } else {
+      OutputFile outputPath = HadoopOutputFile.fromPath(new Path(outputFile), 
conf);
+      List<InputFile> inputs = inputPaths.stream().map(p -> 
HadoopInputFile.fromPathUnchecked(p, conf)).collect(Collectors.toList());
+      builder = new RewriteOptions.Builder(parquetConf, inputs, outputPath);
+    }
+    return builder;
+  }
+
   private void validateSchema() throws IOException {
     ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
     MessageType schema = pmd.getFileMetaData().getSchema();

Reply via email to