This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 1a78a0166 PARQUET-2380: Decouple rewriter from Hadoop (#1195)
1a78a0166 is described below
commit 1a78a0166c748d81b7f19c922023be1e7dffc7b6
Author: Atour <[email protected]>
AuthorDate: Tue Nov 21 06:26:03 2023 +0100
PARQUET-2380: Decouple rewriter from Hadoop (#1195)
Extracts repeated code in the updated test fixture and restores deleted
method japicmp failed to detect.
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 37 +++---
.../parquet/hadoop/rewrite/RewriteOptions.java | 136 ++++++++++++++++++---
.../parquet/hadoop/util/HadoopInputFile.java | 8 ++
.../parquet/hadoop/util/HadoopOutputFile.java | 10 +-
.../hadoop/rewrite/ParquetRewriterTest.java | 70 +++++++----
5 files changed, 205 insertions(+), 56 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 004a1d135..0de0009f5 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -19,8 +19,7 @@
package org.apache.parquet.hadoop.rewrite;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
@@ -34,6 +33,7 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
import org.apache.parquet.crypto.InternalFileEncryptor;
@@ -54,10 +54,10 @@ import
org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopCodecs;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
@@ -96,7 +96,7 @@ public class ParquetRewriter implements Closeable {
private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
private final byte[] pageBuffer = new byte[pageBufferSize];
// Configurations for the new file
- private CompressionCodecName newCodecName = null;
+ private final CompressionCodecName newCodecName;
private Map<ColumnPath, MaskMode> maskColumns = null;
private Set<ColumnPath> encryptColumns = null;
private boolean encryptMode = false;
@@ -122,11 +122,11 @@ public class ParquetRewriter implements Closeable {
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
- Configuration conf = options.getConf();
- Path outPath = options.getOutputFile();
- openInputFiles(options.getInputFiles(), conf);
+ ParquetConfiguration conf = options.getParquetConfiguration();
+ OutputFile out = options.getParquetOutputFile();
+ openInputFiles(options.getParquetInputFiles(), conf);
LOG.info("Start rewriting {} input file(s) {} to {}",
- inputFiles.size(), options.getInputFiles(), outPath);
+ inputFiles.size(), options.getParquetInputFiles(), out);
// Init reader of the first input file
initNextReader();
@@ -165,7 +165,7 @@ public class ParquetRewriter implements Closeable {
this.indexCacheStrategy = options.getIndexCacheStrategy();
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
- writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf),
schema, writerMode,
+ writer = new ParquetFileWriter(out, schema, writerMode,
DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
DEFAULT_STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
options.getFileEncryptionProperties());
@@ -201,13 +201,13 @@ public class ParquetRewriter implements Closeable {
}
// Open all input files to validate their schemas are compatible to merge
- private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+ private void openInputFiles(List<InputFile> inputFiles, ParquetConfiguration
conf) {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"No input files");
- for (Path inputFile : inputFiles) {
+ for (InputFile inputFile : inputFiles) {
try {
TransParquetFileReader reader = new TransParquetFileReader(
- HadoopInputFile.fromPath(inputFile, conf),
HadoopReadOptions.builder(conf).build());
+ inputFile, ParquetReadOptions.builder(conf).build());
MessageType inputFileSchema =
reader.getFooter().getFileMetaData().getSchema();
if (this.schema == null) {
this.schema = inputFileSchema;
@@ -623,8 +623,7 @@ public class ParquetRewriter implements Closeable {
List<Type> fields = schema.getFields();
List<String> currentPath = new ArrayList<>();
List<Type> prunedFields = pruneColumnsInFields(fields, currentPath,
prunePaths);
- MessageType newSchema = new MessageType(schema.getName(), prunedFields);
- return newSchema;
+ return new MessageType(schema.getName(), prunedFields);
}
private List<Type> pruneColumnsInFields(List<Type> fields, List<String>
currentPath, Set<ColumnPath> prunePaths) {
@@ -797,10 +796,10 @@ public class ParquetRewriter implements Closeable {
private final BlockCipher.Encryptor metaDataEncryptor;
private final byte[] fileAAD;
- private byte[] dataPageHeaderAAD;
- private byte[] dataPageAAD;
- private byte[] dictPageHeaderAAD;
- private byte[] dictPageAAD;
+ private final byte[] dataPageHeaderAAD;
+ private final byte[] dataPageAAD;
+ private final byte[] dictPageHeaderAAD;
+ private final byte[] dictPageAAD;
public ColumnChunkEncryptorRunTime(InternalFileEncryptor fileEncryptor,
ColumnChunkMetaData chunk,
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index 5bdc8d590..abe57e02d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -21,22 +21,31 @@ package org.apache.parquet.hadoop.rewrite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.IndexCache;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* A set of options to create a ParquetRewriter.
*/
public class RewriteOptions {
- private final Configuration conf;
- private final List<Path> inputFiles;
- private final Path outputFile;
+ private final ParquetConfiguration conf;
+ private final List<InputFile> inputFiles;
+ private final OutputFile outputFile;
private final List<String> pruneColumns;
private final CompressionCodecName newCodecName;
private final Map<String, MaskMode> maskColumns;
@@ -44,9 +53,9 @@ public class RewriteOptions {
private final FileEncryptionProperties fileEncryptionProperties;
private final IndexCache.CacheStrategy indexCacheStrategy;
- private RewriteOptions(Configuration conf,
- List<Path> inputFiles,
- Path outputFile,
+ private RewriteOptions(ParquetConfiguration conf,
+ List<InputFile> inputFiles,
+ OutputFile outputFile,
List<String> pruneColumns,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
@@ -64,15 +73,70 @@ public class RewriteOptions {
this.indexCacheStrategy = indexCacheStrategy;
}
+ /**
+ * Gets the {@link Configuration} part of the rewrite options.
+ *
+ * @return the associated {@link Configuration}
+ */
public Configuration getConf() {
+ return ConfigurationUtil.createHadoopConfiguration(conf);
+ }
+
+ /**
+ * Gets the {@link ParquetConfiguration} part of the rewrite options.
+ *
+ * @return the associated {@link ParquetConfiguration}
+ */
+ public ParquetConfiguration getParquetConfiguration() {
return conf;
}
+ /**
+ * Gets the input {@link Path}s for the rewrite if they exist for all input
files,
+ * otherwise throws a {@link RuntimeException}.
+ *
+ * @return a {@link List} of the associated input {@link Path}s
+ */
public List<Path> getInputFiles() {
+ return inputFiles.stream().map(f -> {
+ if (f instanceof HadoopOutputFile) {
+ HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) f;
+ return new Path(hadoopOutputFile.getPath());
+ } else {
+ throw new RuntimeException("The input files do not all have an
associated Hadoop Path.");
+ }
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * Gets the {@link InputFile}s for the rewrite.
+ *
+ * @return a {@link List} of the associated {@link InputFile}s
+ */
+ public List<InputFile> getParquetInputFiles() {
return inputFiles;
}
+ /**
+ * Get the {@link Path} for the rewrite if it exists, otherwise throws a
{@link RuntimeException}.
+ *
+ * @return the associated {@link Path} if it exists
+ */
public Path getOutputFile() {
+ if (outputFile instanceof HadoopOutputFile) {
+ HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) outputFile;
+ return new Path(hadoopOutputFile.getPath());
+ } else {
+ throw new RuntimeException("The output file does not have an associated
Hadoop Path.");
+ }
+ }
+
+ /**
+ * Get the {@link OutputFile} for the rewrite.
+ *
+ * @return the associated {@link OutputFile}
+ */
+ public OutputFile getParquetOutputFile() {
return outputFile;
}
@@ -102,9 +166,9 @@ public class RewriteOptions {
// Builder to create a RewriterOptions.
public static class Builder {
- private final Configuration conf;
- private final List<Path> inputFiles;
- private final Path outputFile;
+ private final ParquetConfiguration conf;
+ private final List<InputFile> inputFiles;
+ private final OutputFile outputFile;
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
private Map<String, MaskMode> maskColumns;
@@ -120,9 +184,18 @@ public class RewriteOptions {
* @param outputFile output file path to rewrite to
*/
public Builder(Configuration conf, Path inputFile, Path outputFile) {
- this.conf = conf;
- this.inputFiles = Arrays.asList(inputFile);
- this.outputFile = outputFile;
+ this(new HadoopParquetConfiguration(conf),
HadoopInputFile.fromPathUnchecked(inputFile, conf),
HadoopOutputFile.fromPathUnchecked(outputFile, conf));
+ }
+
+ /**
+ * Create a builder to create a RewriterOptions.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFile input file to read from
+ * @param outputFile output file to rewrite to
+ */
+ public Builder(ParquetConfiguration conf, InputFile inputFile, OutputFile
outputFile) {
+ this(conf, Collections.singletonList(inputFile), outputFile);
}
/**
@@ -141,6 +214,30 @@ public class RewriteOptions {
* @param outputFile output file path to rewrite to
*/
public Builder(Configuration conf, List<Path> inputFiles, Path outputFile)
{
+ this.conf = new HadoopParquetConfiguration(conf);
+ this.inputFiles = new ArrayList<>(inputFiles.size());
+ for (Path inputFile : inputFiles) {
+ this.inputFiles.add(HadoopInputFile.fromPathUnchecked(inputFile,
conf));
+ }
+ this.outputFile = HadoopOutputFile.fromPathUnchecked(outputFile, conf);
+ }
+
+ /**
+ * Create a builder to create a RewriterOptions.
+ * <p>
+ * Please note that if merging more than one file, the schema of all files
must be the same.
+ * Otherwise, the rewrite will fail.
+ * <p>
+ * The rewrite will keep original row groups from all input files. This
may not be optimal
+ * if row groups are very small and will not solve small file problems.
Instead, it will
+ * make it worse to have a large file footer in the output file.
+ * TODO: support rewrite by record to break the original row groups into
reasonable ones.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFiles list of input file paths to read from
+ * @param outputFile output file path to rewrite to
+ */
+ public Builder(ParquetConfiguration conf, List<InputFile> inputFiles,
OutputFile outputFile) {
this.conf = conf;
this.inputFiles = inputFiles;
this.outputFile = outputFile;
@@ -218,7 +315,18 @@ public class RewriteOptions {
* @return self
*/
public Builder addInputFile(Path path) {
- this.inputFiles.add(path);
+ this.inputFiles.add(HadoopInputFile.fromPathUnchecked(path,
ConfigurationUtil.createHadoopConfiguration(conf)));
+ return this;
+ }
+
+ /**
+ * Add an input file to read from.
+ *
+ * @param inputFile input file to read from
+ * @return self
+ */
+ public Builder addInputFile(InputFile inputFile) {
+ this.inputFiles.add(inputFile);
return this;
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
index ad26430ba..5e799bfa8 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -39,6 +39,14 @@ public class HadoopInputFile implements InputFile {
return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
}
+ public static HadoopInputFile fromPathUnchecked(Path path, Configuration
conf) {
+ try {
+ return fromPath(path, conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
throws IOException {
FileSystem fs = stat.getPath().getFileSystem(conf);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
index 30ac50e9d..defe66620 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -54,11 +54,19 @@ public class HadoopOutputFile implements OutputFile {
private final Configuration conf;
public static HadoopOutputFile fromPath(Path path, Configuration conf)
- throws IOException {
+ throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopOutputFile(fs, fs.makeQualified(path), conf);
}
+ public static HadoopOutputFile fromPathUnchecked(Path path, Configuration
conf) {
+ try {
+ return fromPath(path, conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
this.fs = fs;
this.path = path;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index ab25566a9..e66486531 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -27,6 +27,8 @@ import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Version;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.ParquetCipher;
@@ -52,11 +54,13 @@ import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReade
import org.apache.parquet.hadoop.util.EncDecProperties;
import org.apache.parquet.hadoop.util.EncryptionTestFile;
import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.TestFileBuilder;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
@@ -99,28 +103,40 @@ public class ParquetRewriterTest {
private final int numRecord = 100000;
private final Configuration conf = new Configuration();
+ private final ParquetConfiguration parquetConf = new
PlainParquetConfiguration();
private final ParquetProperties.WriterVersion writerVersion;
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean usingHadoop;
private List<EncryptionTestFile> inputFiles = null;
private String outputFile = null;
private ParquetRewriter rewriter = null;
- @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}")
+ @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}, UsingHadoop = {2}")
public static Object[][] parameters() {
- return new Object[][] {{"v1", "NONE"}, {"v1", "PREFETCH_BLOCK"}, {"v2",
"NONE"}, {"v2", "PREFETCH_BLOCK"}};
- }
-
- public ParquetRewriterTest(String writerVersion, String indexCacheStrategy) {
+ return new Object[][] {
+ {"v1", "NONE", true},
+ {"v1", "PREFETCH_BLOCK", true},
+ {"v2", "NONE", true},
+ {"v2", "PREFETCH_BLOCK", true},
+ {"v1", "NONE", false},
+ {"v1", "PREFETCH_BLOCK", false},
+ {"v2", "NONE", false},
+ {"v2", "PREFETCH_BLOCK", false}
+ };
+ }
+
+ public ParquetRewriterTest(String writerVersion, String indexCacheStrategy,
boolean usingHadoop) {
this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
this.indexCacheStrategy =
IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
+ this.usingHadoop = usingHadoop;
}
private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths)
throws Exception {
- Path outputPath = new Path(outputFile);
- List<String> pruneColumns = Arrays.asList("Gender");
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+ List<String> pruneColumns = Collections.singletonList("Gender");
CompressionCodecName newCodec = CompressionCodecName.ZSTD;
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
RewriteOptions options =
builder.prune(pruneColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
@@ -177,12 +193,12 @@ public class ParquetRewriterTest {
}
private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws
Exception {
- Path outputPath = new Path(outputFile);
- List<String> pruneColumns = Arrays.asList("Gender");
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+ List<String> pruneColumns = Collections.singletonList("Gender");
Map<String, MaskMode> maskColumns = new HashMap<>();
maskColumns.put("Links.Forward", MaskMode.NULLIFY);
CompressionCodecName newCodec = CompressionCodecName.ZSTD;
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
RewriteOptions options =
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
@@ -233,11 +249,10 @@ public class ParquetRewriterTest {
}
private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws
Exception {
- Path outputPath = new Path(outputFile);
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
// Prune
- List<String> pruneColumns = Arrays.asList("Gender");
+ List<String> pruneColumns = Collections.singletonList("Gender");
builder.prune(pruneColumns);
// Translate codec
@@ -314,8 +329,7 @@ public class ParquetRewriterTest {
inputFiles = inputPaths.stream().map(p -> new
EncryptionTestFile(p.toString(), null)).collect(Collectors.toList());
- Path outputPath = new Path(outputFile);
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
Map<String, MaskMode> maskCols = Maps.newHashMap();
maskCols.put("location.lat", MaskMode.NULLIFY);
@@ -380,8 +394,9 @@ public class ParquetRewriterTest {
FileEncryptionProperties fileEncryptionProperties =
EncDecProperties.getFileEncryptionProperties(
encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
- Path outputPath = new Path(outputFile);
- RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths,
outputPath)
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+ RewriteOptions options = builder
.mask(maskColumns)
.transform(CompressionCodecName.ZSTD)
.encrypt(Arrays.asList(encryptColumns))
@@ -456,8 +471,7 @@ public class ParquetRewriterTest {
for (EncryptionTestFile inputFile : inputFiles) {
inputPaths.add(new Path(inputFile.getFileName()));
}
- Path outputPath = new Path(outputFile);
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
RewriteOptions options =
builder.indexCacheStrategy(indexCacheStrategy).build();
rewriter = new ParquetRewriter(options);
@@ -524,8 +538,7 @@ public class ParquetRewriterTest {
for (EncryptionTestFile inputFile : inputFiles) {
inputPaths.add(new Path(inputFile.getFileName()));
}
- Path outputPath = new Path(outputFile);
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
RewriteOptions options =
builder.indexCacheStrategy(indexCacheStrategy).build();
// This should throw an exception because the schemas are different
@@ -934,6 +947,19 @@ public class ParquetRewriterTest {
return allBloomFilters;
}
+ private RewriteOptions.Builder createBuilder(List<Path> inputPaths) throws
IOException {
+ RewriteOptions.Builder builder;
+ if (usingHadoop) {
+ Path outputPath = new Path(outputFile);
+ builder = new RewriteOptions.Builder(conf, inputPaths, outputPath);
+ } else {
+ OutputFile outputPath = HadoopOutputFile.fromPath(new Path(outputFile),
conf);
+ List<InputFile> inputs = inputPaths.stream().map(p ->
HadoopInputFile.fromPathUnchecked(p, conf)).collect(Collectors.toList());
+ builder = new RewriteOptions.Builder(parquetConf, inputs, outputPath);
+ }
+ return builder;
+ }
+
private void validateSchema() throws IOException {
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();