This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new c928c4bfc GH-2959: Optimize the test case of parquet rewriter. (#2960)
c928c4bfc is described below
commit c928c4bfc15717d39057ba14663ecda7ff3f8bf2
Author: joyCurry30 <[email protected]>
AuthorDate: Tue Jul 23 16:31:14 2024 +0800
GH-2959: Optimize the test case of parquet rewriter. (#2960)
---
.../hadoop/rewrite/ParquetRewriterTest.java | 134 ++++++++++++---------
.../parquet/hadoop/util/TestFileBuilder.java | 28 +++--
2 files changed, 91 insertions(+), 71 deletions(-)
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 2795a6eba..0573d4e33 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -107,10 +107,13 @@ public class ParquetRewriterTest {
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean usingHadoop;
- private List<EncryptionTestFile> inputFiles = null;
+ private List<EncryptionTestFile> inputFiles = Lists.newArrayList();
private String outputFile = null;
private ParquetRewriter rewriter = null;
+ private final EncryptionTestFile
gzipEncryptionTestFileWithoutBloomFilterColumn;
+ private final EncryptionTestFile
uncompressedEncryptionTestFileWithoutBloomFilterColumn;
+
@Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}, UsingHadoop = {2}")
public static Object[][] parameters() {
return new Object[][] {
@@ -121,10 +124,26 @@ public class ParquetRewriterTest {
};
}
- public ParquetRewriterTest(String writerVersion, String indexCacheStrategy,
boolean usingHadoop) {
+ public ParquetRewriterTest(String writerVersion, String indexCacheStrategy,
boolean usingHadoop)
+ throws IOException {
this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
this.indexCacheStrategy =
IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
this.usingHadoop = usingHadoop;
+
+ MessageType testSchema = createSchema();
+ this.gzipEncryptionTestFileWithoutBloomFilterColumn = new
TestFileBuilder(conf, testSchema)
+ .withNumRecord(numRecord)
+ .withCodec("GZIP")
+ .withPageSize(1024)
+ .withWriterVersion(this.writerVersion)
+ .build();
+
+ this.uncompressedEncryptionTestFileWithoutBloomFilterColumn = new
TestFileBuilder(conf, testSchema)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(this.writerVersion)
+ .build();
}
private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths)
throws Exception {
@@ -141,7 +160,7 @@ public class ParquetRewriterTest {
rewriter.processBlocks();
rewriter.close();
- // Verify the schema are not changed for the columns not pruned
+ // Verify the schema is not changed for the columns not pruned
validateSchema();
// Verify codec has been translated
@@ -179,7 +198,7 @@ public class ParquetRewriterTest {
@Test
public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception
{
- testSingleInputFileSetup("GZIP");
+ ensureContainsGzipFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -190,7 +209,8 @@ public class ParquetRewriterTest {
@Test
public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
- testMultipleInputFilesSetup();
+ ensureContainsGzipFile();
+ ensureContainsUncompressedFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -249,7 +269,8 @@ public class ParquetRewriterTest {
@Test
public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
- testSingleInputFileSetup("GZIP");
+ ensureContainsGzipFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -260,7 +281,9 @@ public class ParquetRewriterTest {
@Test
public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
- testMultipleInputFilesSetup();
+ ensureContainsGzipFile();
+ ensureContainsUncompressedFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -294,7 +317,7 @@ public class ParquetRewriterTest {
rewriter.processBlocks();
rewriter.close();
- // Verify the schema are not changed for the columns not pruned
+ // Verify the schema is not changed for the columns not pruned
validateSchema();
// Verify codec has been translated
@@ -331,7 +354,8 @@ public class ParquetRewriterTest {
@Test
public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
- testSingleInputFileSetup("GZIP");
+ ensureContainsGzipFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -342,7 +366,9 @@ public class ParquetRewriterTest {
@Test
public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
- testMultipleInputFilesSetup();
+ ensureContainsGzipFile();
+ ensureContainsUncompressedFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -383,7 +409,7 @@ public class ParquetRewriterTest {
rewriter.processBlocks();
rewriter.close();
- // Verify the schema are not changed for the columns not pruned
+ // Verify the schema is not changed for the columns not pruned
ParquetMetadata pmd =
ParquetFileReader.readFooter(conf, new Path(outputFile),
ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
@@ -413,7 +439,7 @@ public class ParquetRewriterTest {
assertEquals(inRead.getLong("id", 0), outRead.getLong("id", 0));
assertEquals(inRead.getString("name", 0), outRead.getString("name",
0));
- // location was nulled
+ // location was null
Group finalOutRead = outRead;
assertThrows(
RuntimeException.class,
@@ -422,7 +448,7 @@ public class ParquetRewriterTest {
RuntimeException.class,
() -> finalOutRead.getGroup("location", 0).getDouble("lon", 0));
- // phonenumbers was pruned
+ // phone numbers was pruned
assertThrows(InvalidRecordException.class, () ->
finalOutRead.getGroup("phoneNumbers", 0));
}
}
@@ -497,7 +523,8 @@ public class ParquetRewriterTest {
@Test
public void testNullifyEncryptSingleFile() throws Exception {
- testSingleInputFileSetup("GZIP");
+ ensureContainsGzipFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -508,7 +535,9 @@ public class ParquetRewriterTest {
@Test
public void testNullifyEncryptTwoFiles() throws Exception {
- testMultipleInputFilesSetup();
+ ensureContainsGzipFile();
+ ensureContainsUncompressedFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -520,7 +549,8 @@ public class ParquetRewriterTest {
@Test
public void testMergeTwoFilesOnly() throws Exception {
- testMultipleInputFilesSetup();
+ ensureContainsGzipFile();
+ ensureContainsUncompressedFile();
// Only merge two files but do not change anything.
List<Path> inputPaths = new ArrayList<>();
@@ -534,7 +564,7 @@ public class ParquetRewriterTest {
rewriter.processBlocks();
rewriter.close();
- // Verify the schema are not changed
+ // Verify the schema is not changed
ParquetMetadata pmd =
ParquetFileReader.readFooter(conf, new Path(outputFile),
ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
@@ -615,7 +645,8 @@ public class ParquetRewriterTest {
@Test
public void testRewriteFileWithMultipleBlocks() throws Exception {
- testSingleInputFileSetup("GZIP", 1024L);
+ ensureContainsGzipFile();
+
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -626,7 +657,7 @@ public class ParquetRewriterTest {
@Test
public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws
Exception {
- testSingleInputFileSetupWithBloomFilter("GZIP", "DocId");
+ testSingleInputFileSetupWithBloomFilter("DocId");
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -635,14 +666,14 @@ public class ParquetRewriterTest {
testPruneSingleColumnTranslateCodec(inputPaths);
// Verify bloom filters
- Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters(null);
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters();
Map<ColumnPath, List<BloomFilter>> outputBloomFilters =
allOutputBloomFilters(null);
assertEquals(inputBloomFilters, outputBloomFilters);
}
@Test
public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws
Exception {
- testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
+ testSingleInputFileSetupWithBloomFilter("DocId", "Links.Forward");
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -651,7 +682,7 @@ public class ParquetRewriterTest {
testPruneNullifyTranslateCodec(inputPaths);
// Verify bloom filters
- Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters(null);
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters();
assertEquals(inputBloomFilters.size(), 2);
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("Links.Forward")));
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
@@ -666,7 +697,7 @@ public class ParquetRewriterTest {
@Test
public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws
Exception {
- testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
+ testSingleInputFileSetupWithBloomFilter("DocId", "Links.Forward");
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -675,7 +706,7 @@ public class ParquetRewriterTest {
testPruneEncryptTranslateCodec(inputPaths);
// Verify bloom filters
- Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters(null);
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters();
// Cannot read without FileDecryptionProperties
assertThrows(ParquetCryptoRuntimeException.class, () ->
allOutputBloomFilters(null));
@@ -685,42 +716,19 @@ public class ParquetRewriterTest {
assertEquals(inputBloomFilters, outputBloomFilters);
}
- private void testSingleInputFileSetup(String compression) throws IOException
{
- testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
- }
-
- private void testSingleInputFileSetupWithBloomFilter(String compression,
String... bloomFilterEnabledColumns)
- throws IOException {
- testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE,
bloomFilterEnabledColumns);
+ private void testSingleInputFileSetupWithBloomFilter(String...
bloomFilterEnabledColumns) throws IOException {
+ testSingleInputFileSetup(bloomFilterEnabledColumns);
}
- private void testSingleInputFileSetup(String compression, long rowGroupSize,
String... bloomFilterEnabledColumns)
- throws IOException {
- MessageType schema = createSchema();
- inputFiles = Lists.newArrayList();
- inputFiles.add(new TestFileBuilder(conf, schema)
- .withNumRecord(numRecord)
- .withCodec(compression)
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .withRowGroupSize(rowGroupSize)
- .withBloomFilterEnabled(bloomFilterEnabledColumns)
- .withWriterVersion(writerVersion)
- .build());
- }
-
- private void testMultipleInputFilesSetup() throws IOException {
+ private void testSingleInputFileSetup(String... bloomFilterEnabledColumns)
throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec("GZIP")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .withWriterVersion(writerVersion)
- .build());
- inputFiles.add(new TestFileBuilder(conf, schema)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
+ .withBloomFilterEnabled(bloomFilterEnabledColumns)
.withWriterVersion(writerVersion)
.build());
}
@@ -748,7 +756,7 @@ public class ParquetRewriterTest {
.withDecryption(fileDecryptionProperties)
.build();
- // Get total number of rows from input files
+ // Get the total number of rows from input files
int totalRows = 0;
for (EncryptionTestFile inputFile : inputFiles) {
totalRows += inputFile.getFileContent().length;
@@ -821,7 +829,7 @@ public class ParquetRewriterTest {
ParquetReadOptions readOptions = ParquetReadOptions.builder()
.withDecryption(fileDecryptionProperties)
.build();
- ParquetMetadata pmd = null;
+ ParquetMetadata pmd;
InputFile inputFile = HadoopInputFile.fromPath(new Path(file), conf);
try (SeekableInputStream in = inputFile.newStream()) {
pmd = ParquetFileReader.readFooter(inputFile, readOptions, in);
@@ -995,12 +1003,10 @@ public class ParquetRewriterTest {
assertEquals(inputRowCounts, outputRowCounts);
}
- private Map<ColumnPath, List<BloomFilter>>
allInputBloomFilters(FileDecryptionProperties fileDecryptionProperties)
- throws Exception {
+ private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters() throws
Exception {
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
for (EncryptionTestFile inputFile : inputFiles) {
- Map<ColumnPath, List<BloomFilter>> bloomFilters =
- allBloomFilters(inputFile.getFileName(), fileDecryptionProperties);
+ Map<ColumnPath, List<BloomFilter>> bloomFilters =
allBloomFilters(inputFile.getFileName(), null);
for (Map.Entry<ColumnPath, List<BloomFilter>> entry :
bloomFilters.entrySet()) {
List<BloomFilter> bloomFilterList =
inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
bloomFilterList.addAll(entry.getValue());
@@ -1072,4 +1078,16 @@ public class ParquetRewriterTest {
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
}
+
+ private void ensureContainsGzipFile() {
+ if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
+ inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
+ }
+ }
+
+ private void ensureContainsUncompressedFile() {
+ if
(!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
+ inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
+ }
+ }
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 809b4fd35..003d14355 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -41,8 +41,10 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
public class TestFileBuilder {
- private MessageType schema;
- private Configuration conf;
+ private static final char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x',
'z', 'y'};
+ private static final ThreadLocalRandom random = ThreadLocalRandom.current();
+ private final MessageType schema;
+ private final Configuration conf;
private Map<String, String> extraMeta = new HashMap<>();
private int numRecord = 100000;
private ParquetProperties.WriterVersion writerVersion =
ParquetProperties.WriterVersion.PARQUET_1_0;
@@ -164,7 +166,7 @@ public class TestFileBuilder {
private void addPrimitiveValueToSimpleGroup(Group g, PrimitiveType
primitiveType) {
if (primitiveType.isRepetition(Type.Repetition.REPEATED)) {
- int listSize = ThreadLocalRandom.current().nextInt(1, 10);
+ int listSize = random.nextInt(1, 10);
for (int i = 0; i < listSize; i++) {
addSinglePrimitiveValueToSimpleGroup(g, primitiveType);
}
@@ -191,39 +193,39 @@ public class TestFileBuilder {
}
private static long getInt() {
- return ThreadLocalRandom.current().nextInt(10000);
+ return random.nextInt(10000);
}
private static long getLong() {
- return ThreadLocalRandom.current().nextLong(100000);
+ return random.nextLong(100000);
}
private static String getString() {
- char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 100; i++) {
- sb.append(chars[ThreadLocalRandom.current().nextInt(10)]);
+ for (int i = 0; i < random.nextInt(100); i++) {
+ sb.append(chars[random.nextInt(10)]);
}
return sb.toString();
}
private static float getFloat() {
- if (ThreadLocalRandom.current().nextBoolean()) {
+ if (random.nextBoolean()) {
return Float.NaN;
}
- return ThreadLocalRandom.current().nextFloat();
+ return random.nextFloat();
}
private static double getDouble() {
- if (ThreadLocalRandom.current().nextBoolean()) {
+ if (random.nextBoolean()) {
return Double.NaN;
}
- return ThreadLocalRandom.current().nextDouble();
+ return random.nextDouble();
}
public static String createTempFile(String prefix) {
try {
- return Files.createTempDirectory(prefix).toAbsolutePath().toString() +
"/test.parquet";
+ return Files.createTempDirectory(prefix).toAbsolutePath() +
"/test.parquet";
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
}