ACCUMULO-2182 Backport of ACCUMULO-2104 and ACCUMULO-2106 to 1.4.x
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5c50e42b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5c50e42b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5c50e42b Branch: refs/heads/1.4.5-SNAPSHOT Commit: 5c50e42be24bf58d3aa44e29baf491e26dfd8994 Parents: ed4c227 Author: Bill Havanki <[email protected]> Authored: Mon Jan 13 15:54:36 2014 -0500 Committer: Bill Havanki <[email protected]> Committed: Mon Jan 13 17:04:46 2014 -0500 ---------------------------------------------------------------------- .../accumulo/server/test/randomwalk/State.java | 13 +++++++++++++ .../server/test/randomwalk/image/ImageFixture.java | 17 ++++++++++++++++- .../randomwalk/multitable/MultiTableFixture.java | 14 ++++++++++++++ .../randomwalk/sequential/SequentialFixture.java | 14 ++++++++++++++ .../server/test/randomwalk/shard/ShardFixture.java | 15 +++++++++++++++ 5 files changed, 72 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c50e42b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java index 344f3b3..f9bd84e 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java @@ -24,6 +24,7 @@ import java.util.Properties; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.log4j.Logger; @@ -115,6 +116,18 @@ public class State { return mtbw; } + public boolean isMultiTableBatchWriterInitialized() { + return mtbw != null; + } + + public void resetMultiTableBatchWriter() { + if (!mtbw.isClosed()) { + log.warn("Setting non-closed MultiTableBatchWriter to null (leaking resources)"); + } + + mtbw = null; + } + public String getMapReduceJars() { String acuHome = System.getenv("ACCUMULO_HOME"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c50e42b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/image/ImageFixture.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/image/ImageFixture.java b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/image/ImageFixture.java index fe406e7..44741a2 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/image/ImageFixture.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/image/ImageFixture.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.server.test.randomwalk.Fixture; @@ -105,7 +107,20 @@ public class ImageFixture extends Fixture { @Override public void tearDown(State state) throws Exception { - + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } + + // Now we can safely delete the tables log.debug("Dropping tables: " + imageTableName + " " + indexTableName); Connector conn = state.getConnector(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c50e42b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/MultiTableFixture.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/MultiTableFixture.java b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/MultiTableFixture.java index d7f65ae..55850f6 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/MultiTableFixture.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/MultiTableFixture.java @@ -20,6 +20,8 @@ import java.net.InetAddress; import java.util.ArrayList; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.server.test.randomwalk.Fixture; import org.apache.accumulo.server.test.randomwalk.State; @@ -40,6 +42,18 @@ public class MultiTableFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } Connector conn = state.getConnector(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c50e42b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/SequentialFixture.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/SequentialFixture.java b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/SequentialFixture.java index 5714893..dbc1aab 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/SequentialFixture.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/SequentialFixture.java @@ -20,6 +20,8 @@ import java.net.InetAddress; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.server.test.randomwalk.Fixture; @@ -55,6 +57,18 @@ public class SequentialFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } log.debug("Dropping tables: " + seqTableName); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5c50e42b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ShardFixture.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ShardFixture.java b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ShardFixture.java index b91c81c..a9f8ded 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ShardFixture.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ShardFixture.java @@ -22,6 +22,8 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.test.randomwalk.Fixture; import org.apache.accumulo.server.test.randomwalk.State; @@ -94,6 +96,19 @@ public class ShardFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } + Connector conn = state.getConnector(); conn.tableOperations().delete((String) state.get("indexTableName"));
