Repository: tez Updated Branches: refs/heads/master a70e16326 -> f0a9281ca
TEZ-3741. Tez outputs should free memory when closed (Jason Lowe via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0a9281c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0a9281c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0a9281c Branch: refs/heads/master Commit: f0a9281caf29c763c77bf784e9095140e7ca0c8a Parents: a70e163 Author: Kuhu Shukla <[email protected]> Authored: Mon Jun 19 08:38:27 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Mon Jun 19 08:38:27 2017 -0500 ---------------------------------------------------------------------- .../common/sort/impl/dflt/DefaultSorter.java | 14 +++++- .../output/OrderedPartitionedKVOutput.java | 1 + .../library/output/UnorderedKVOutput.java | 1 + .../output/UnorderedPartitionedKVOutput.java | 1 + .../sort/impl/dflt/TestDefaultSorter.java | 2 + .../library/output/OutputTestHelpers.java | 34 +++++++++++++- .../output/TestOrderedPartitionedKVOutput2.java | 43 +++++++++++++++++- .../library/output/TestUnorderedKVOutput2.java | 47 ++++++++++++++++++++ 8 files changed, 138 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index a6f7cf7..1528076 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -78,7 +78,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab private final static int APPROX_HEADER_LENGTH = 150; // k/v accounting - private final IntBuffer kvmeta; // metadata overlay on backing store + private IntBuffer kvmeta; // metadata overlay on backing store int kvstart; // marks origin of spill metadata int kvend; // marks end of spill metadata int kvindex; // marks end of fully serialized records @@ -91,7 +91,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab int bufvoid; // marks the point where we should stop // reading at the end of the buffer - private final byte[] kvbuffer; // main output buffer + private byte[] kvbuffer; // main output buffer private final byte[] b0 = new byte[0]; protected static final int VALSTART = 0; // val offset in acct @@ -749,6 +749,16 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab } } + @Override + public void close() throws IOException { + super.close(); + kvbuffer = null; + kvmeta = null; + } + + boolean isClosed() { + return kvbuffer == null && kvmeta == null; + } protected class SpillThread extends Thread { http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 98e14be..7d3e0b4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -187,6 +187,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { sorter.close(); this.endTime = System.nanoTime(); returnEvents = generateEvents(); + sorter = null; } else { LOG.warn(getContext().getDestinationVertexName() + ": Attempting to close output {} of type {} before it was started. Generating empty events", http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 3689e5c..51521e4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -127,6 +127,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { if (isStarted.get()) { //TODO: Do we need to support sending payloads via events? returnEvents = kvWriter.close(); + kvWriter = null; } else { LOG.warn(getContext().getDestinationVertexName() + ": Attempting to close output {} of type {} before it was started. Generating empty events", http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index eeca066..e83f1e9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -104,6 +104,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { List<Event> returnEvents = null; if (isStarted.get()) { returnEvents = kvWriter.close(); + kvWriter = null; } else { LOG.warn(getContext().getDestinationVertexName() + ": Attempting to close output {} of type {} before it was started. Generating empty events", http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 73d249c..b3b16d9 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -280,6 +280,7 @@ public class TestDefaultSorter { try { sorter.flush(); sorter.close(); + assertTrue(sorter.isClosed()); assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID)); verifyCounters(sorter, context); } catch(Exception e) { @@ -302,6 +303,7 @@ public class TestDefaultSorter { try { sorter.flush(); sorter.close(); + assertTrue(sorter.isClosed()); assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID + "_0")); verifyCounters(sorter, context); http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java index db9a0ed..573d53e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java @@ -14,20 +14,29 @@ package org.apache.tez.runtime.library.output; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.OutputStatisticsReporter; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -public class OutputTestHelpers { +class OutputTestHelpers { static OutputContext createOutputContext() throws IOException { OutputContext outputContext = mock(OutputContext.class); Configuration conf = new TezConfiguration(); @@ -44,4 +53,27 @@ public class OutputTestHelpers { doReturn(statsReporter).when(outputContext).getStatisticsReporter(); return outputContext; } + + static OutputContext createOutputContext(Configuration conf, Path workingDir) throws IOException { + OutputContext ctx = mock(OutputContext.class); + doAnswer(new Answer<Void>() { + @Override public Void answer(InvocationOnMock invocation) throws Throwable { + long requestedSize = (Long) invocation.getArguments()[0]; + MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation + .getArguments()[1]; + callback.memoryAssigned(requestedSize); + return null; + } + }).when(ctx).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class)); + doReturn(TezUtils.createUserPayloadFromConf(conf)).when(ctx).getUserPayload(); + doReturn("destinationVertex").when(ctx).getDestinationVertexName(); + doReturn("UUID").when(ctx).getUniqueIdentifier(); + doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs(); + doReturn(200 * 1024 * 1024l).when(ctx).getTotalMemoryAvailableToTask(); + doReturn(new TezCounters()).when(ctx).getCounters(); + OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class); + doReturn(statsReporter).when(ctx).getStatisticsReporter(); + doReturn(new ExecutionContextImpl("localhost")).when(ctx).getExecutionContext(); + return ctx; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java index 8e76a8b..f226b7c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java @@ -23,22 +23,53 @@ import java.util.BitSet; import java.util.List; import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.After; +import org.junit.Before; import org.junit.Test; // Tests which don't require parameterization public class TestOrderedPartitionedKVOutput2 { + private Configuration conf; + private FileSystem localFs; + private Path workingDir; + @Before + public void setup() throws IOException { + conf = new Configuration(); + localFs = FileSystem.getLocal(conf); + workingDir = new Path(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir", "/tmp")), + TestUnorderedKVOutput2.class.getName()).makeQualified( + localFs.getUri(), localFs.getWorkingDirectory()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, + HashPartitioner.class.getName()); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString()); + } + + @After + public void cleanup() throws IOException { + localFs.delete(workingDir, true); + } @Test(timeout = 5000) public void testNonStartedOutput() throws IOException { - OutputContext outputContext = OutputTestHelpers.createOutputContext(); + OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir); int numPartitions = 10; OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions); output.initialize(); @@ -63,5 +94,13 @@ public class TestOrderedPartitionedKVOutput2 { } } - + @Test(timeout = 10000) + public void testClose() throws Exception { + OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir); + int numPartitions = 10; + OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions); + output.initialize(); + output.start(); + output.close(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f0a9281c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java index ecc1241..792b03f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java @@ -15,23 +15,58 @@ package org.apache.tez.runtime.library.output; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.List; import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.After; +import org.junit.Before; import org.junit.Test; // Tests which don't require parameterization public class TestUnorderedKVOutput2 { + private Configuration conf; + private FileSystem localFs; + private Path workingDir; + + @Before + public void setup() throws IOException { + conf = new Configuration(); + localFs = FileSystem.getLocal(conf); + workingDir = new Path(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir", "/tmp")), + TestUnorderedKVOutput2.class.getName()).makeQualified( + localFs.getUri(), localFs.getWorkingDirectory()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, + HashPartitioner.class.getName()); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString()); + } + + @After + public void cleanup() throws IOException { + localFs.delete(workingDir, true); + } @Test(timeout = 5000) public void testNonStartedOutput() throws Exception { @@ -57,4 +92,16 @@ public class TestUnorderedKVOutput2 { assertTrue(emptyPartionsBitSet.get(i)); } } + + @Test(timeout = 10000) + public void testClose() throws Exception { + OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir); + int numPartitions = 1; + UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions); + output.initialize(); + output.start(); + assertNotNull(output.getWriter()); + output.close(); + assertNull(output.getWriter()); + } }
