Repository: parquet-mr Updated Branches: refs/heads/master b1ea059a6 -> 5294c64b3
PARQUET-373: Fix flaky MemoryManager tests. Author: Ryan Blue <[email protected]> Closes #269 from rdblue/PARQUET-373-fix-flaky-mem-manager-tests and squashes the following commits: 1b55889 [Ryan Blue] PARQUET-373: Fix flaky MemoryManager tests. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/5294c64b Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/5294c64b Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/5294c64b Branch: refs/heads/master Commit: 5294c64b342818e021800b38413f36f426e35b3c Parents: b1ea059 Author: Ryan Blue <[email protected]> Authored: Mon Oct 19 15:51:07 2015 -0700 Committer: Ryan Blue <[email protected]> Committed: Mon Oct 19 15:51:07 2015 -0700 ---------------------------------------------------------------------- .../parquet/hadoop/TestMemoryManager.java | 167 +++++++++++++------ 1 file changed, 114 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/5294c64b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java index fb71b86..b91fedd 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java @@ -18,14 +18,14 @@ */ package org.apache.parquet.hadoop; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; -import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; @@ -44,89 +44,150 @@ public class TestMemoryManager { "required int32 line;\n" + "required binary content;\n" + "}"; - long expectPoolSize; - int rowGroupSize; + long expectedPoolSize; ParquetOutputFormat parquetOutputFormat; - CompressionCodecName codec; int counter = 0; - boolean firstRegister = true; @Before - public void setUp() { - GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema),conf); - expectPoolSize = Math.round((double) ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax - () * MemoryManager.DEFAULT_MEMORY_POOL_RATIO); - rowGroupSize = (int) Math.floor(expectPoolSize / 2); - conf.setInt(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize); - codec = CompressionCodecName.UNCOMPRESSED; + public void setUp() throws Exception { + parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport()); + + GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), conf); + expectedPoolSize = Math.round((double) + ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * + MemoryManager.DEFAULT_MEMORY_POOL_RATIO); + + long rowGroupSize = expectedPoolSize / 2; + conf.setLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize); + + // the memory manager is not initialized until a writer is created + createWriter(1).close(null); } - @After - public void tearDown() throws Exception{ - FileUtils.deleteDirectory(new File("target/test")); + @Test + public void testMemoryManagerUpperLimit() { + // Verify the memory pool size + // this value tends to change a little between setup and tests, so this + // validates that it is within 5% of the expected value + long poolSize = ParquetOutputFormat.getMemoryManager().getTotalMemoryPool(); + Assert.assertTrue("Pool size should be within 5% of the expected value", + Math.abs(expectedPoolSize - poolSize) < (long) (expectedPoolSize * 0.05)); } @Test public void testMemoryManager() throws Exception { - //Verify the adjusted rowGroupSize of writers + long poolSize = ParquetOutputFormat.getMemoryManager().getTotalMemoryPool(); + long rowGroupSize = poolSize / 2; + conf.setLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize); + + Assert.assertTrue("Pool should hold 2 full row groups", + (2 * rowGroupSize) <= poolSize); + Assert.assertTrue("Pool should not hold 3 full row groups", + poolSize < (3 * rowGroupSize)); + + Assert.assertEquals("Allocations should start out at 0", + 0, getTotalAllocation()); + RecordWriter writer1 = createWriter(1); - verifyRowGroupSize(rowGroupSize); + Assert.assertTrue("Allocations should never exceed pool size", + getTotalAllocation() <= poolSize); + Assert.assertEquals("First writer should be limited by row group size", + rowGroupSize, getTotalAllocation()); RecordWriter writer2 = createWriter(2); - verifyRowGroupSize(rowGroupSize); + Assert.assertTrue("Allocations should never exceed pool size", + getTotalAllocation() <= poolSize); + Assert.assertEquals("Second writer should be limited by row group size", + 2 * rowGroupSize, getTotalAllocation()); RecordWriter writer3 = createWriter(3); - verifyRowGroupSize((int) Math.floor(expectPoolSize / 3)); + Assert.assertTrue("Allocations should never exceed pool size", + getTotalAllocation() <= poolSize); writer1.close(null); - verifyRowGroupSize(rowGroupSize); + Assert.assertTrue("Allocations should never exceed pool size", + getTotalAllocation() <= poolSize); + Assert.assertEquals("Allocations should be increased to the row group size", + 2 * rowGroupSize, getTotalAllocation()); writer2.close(null); - verifyRowGroupSize(rowGroupSize); + Assert.assertTrue("Allocations should never exceed pool size", + getTotalAllocation() <= poolSize); + Assert.assertEquals("Allocations should be increased to the row group size", + rowGroupSize, getTotalAllocation()); writer3.close(null); + Assert.assertEquals("Allocations should be increased to the row group size", + 0, getTotalAllocation()); + } - //Verify the memory pool - Assert.assertEquals("memory pool size is incorrect.", expectPoolSize, - parquetOutputFormat.getMemoryManager().getTotalMemoryPool()); + @Test + public void testReallocationCallback() throws Exception { + // validate assumptions + long poolSize = ParquetOutputFormat.getMemoryManager().getTotalMemoryPool(); + long rowGroupSize = poolSize / 2; + conf.setLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize); + + Assert.assertTrue("Pool should hold 2 full row groups", + (2 * rowGroupSize) <= poolSize); + Assert.assertTrue("Pool should not hold 3 full row groups", + poolSize < (3 * rowGroupSize)); + + Runnable callback = new Runnable() { + @Override + public void run() { + counter++; + } + }; - //Verify Callback mechanism - Assert.assertEquals("counter calculated by callback is incorrect.", 1, counter); - Assert.assertEquals("CallBack is duplicated.", 1, parquetOutputFormat.getMemoryManager() - .getScaleCallBacks().size()); - } + // first-time registration should succeed + ParquetOutputFormat.getMemoryManager() + .registerScaleCallBack("increment-test-counter", callback); - private RecordWriter createWriter(int index) throws Exception{ - Path file = new Path("target/test/", "parquet" + index); - parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport()); - RecordWriter writer = parquetOutputFormat.getRecordWriter(conf, file, codec); try { - parquetOutputFormat.getMemoryManager().registerScaleCallBack("increment-test-counter", - new Runnable() { - @Override - public void run() { - counter++; - } - }); - if (!firstRegister) { - Assert.fail("Duplicated registering callback should throw duplicates exception."); - } - firstRegister = false; + ParquetOutputFormat.getMemoryManager() + .registerScaleCallBack("increment-test-counter", callback); + Assert.fail("Duplicated registering callback should throw duplicates exception."); } catch (IllegalArgumentException e) { - if (firstRegister) { - Assert.fail("Registering the same callback first time should succeed."); - } + // expected + } + + // hit the limit once and clean up + RecordWriter writer1 = createWriter(1); + RecordWriter writer2 = createWriter(2); + RecordWriter writer3 = createWriter(3); + writer1.close(null); + writer2.close(null); + writer3.close(null); + + //Verify Callback mechanism + Assert.assertEquals("Allocations should be adjusted once", 1, counter); + Assert.assertEquals("Should not allow duplicate callbacks", + 1, ParquetOutputFormat.getMemoryManager().getScaleCallBacks().size()); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private RecordWriter createWriter(int index) throws Exception { + File file = temp.newFile(String.valueOf(index) + ".parquet"); + if (!file.delete()) { + throw new RuntimeException("Could not delete file: " + file); } + RecordWriter writer = parquetOutputFormat.getRecordWriter( + conf, new Path(file.toString()), + CompressionCodecName.UNCOMPRESSED); return writer; } - private void verifyRowGroupSize(int expectRowGroupSize) { - Set<InternalParquetRecordWriter> writers = parquetOutputFormat.getMemoryManager() - .getWriterList().keySet(); + private long getTotalAllocation() { + Set<InternalParquetRecordWriter> writers = ParquetOutputFormat + .getMemoryManager().getWriterList().keySet(); + long total = 0; for (InternalParquetRecordWriter writer : writers) { - Assert.assertEquals("wrong rowGroupSize", expectRowGroupSize, - writer.getRowGroupSizeThreshold(), 1); + total += writer.getRowGroupSizeThreshold(); } + return total; } }
