This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 140202c Broke ExternalCompactionIT into 3 different IT classes 140202c is described below commit 140202c00aca8e99bbc424d07b8c329c9ef009c8 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Sep 24 09:21:59 2021 -0400 Broke ExternalCompactionIT into 3 different IT classes Broke ExternalCompactionIT into 3 different IT classes, also moved the compaction related ITs to a compaction package Co-authored-by: Jeffrey Manno <jeffreymann...@gmail.com> --- .../apache/accumulo/test/ExternalCompactionIT.java | 1229 -------------------- .../org/apache/accumulo/test/ShellServerIT.java | 1 + .../{ => compaction}/CompactionExecutorIT.java | 2 +- .../CompactionRateLimitingDeprecatedIT.java | 2 +- .../{ => compaction}/CompactionRateLimitingIT.java | 2 +- .../ConfigurableMajorCompactionIT.java | 2 +- .../ExternalCompactionTServer.java | 2 +- .../test/compaction/ExternalCompactionUtils.java | 252 ++++ .../test/compaction/ExternalCompaction_1_IT.java | 617 ++++++++++ .../test/compaction/ExternalCompaction_2_IT.java | 364 ++++++ .../test/compaction/ExternalCompaction_3_IT.java | 187 +++ .../ExternalDoNothingCompactor.java | 2 +- ...ttingExternalCompactionThriftClientHandler.java | 2 +- .../{ => compaction}/SizeCompactionStrategy.java | 2 +- .../test/{ => compaction}/SplitCancelsMajCIT.java | 2 +- .../TestCompactionCoordinator.java | 2 +- .../TestCompactionCoordinatorForOfflineTable.java | 2 +- .../{ => compaction}/TestCompactionStrategy.java | 2 +- .../{ => compaction}/UserCompactionStrategyIT.java | 2 +- .../accumulo/test/functional/CompactionIT.java | 4 +- 20 files changed, 1436 insertions(+), 1244 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java deleted file mode 100644 index a6ca80f..0000000 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ /dev/null @@ -1,1229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test; - -import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpClient.Redirect; -import java.net.http.HttpClient.Version; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.net.http.HttpResponse.BodyHandlers; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.accumulo.compactor.Compactor; -import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; -import org.apache.accumulo.coordinator.CompactionCoordinator; -import org.apache.accumulo.coordinator.ExternalCompactionMetrics; -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.CompactionConfig; -import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.admin.PluginConfig; -import org.apache.accumulo.core.client.admin.compaction.CompactableFile; -import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; -import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; -import org.apache.accumulo.core.clientImpl.Tables; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.DevNull; -import org.apache.accumulo.core.iterators.Filter; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; -import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; -import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.accumulo.harness.MiniClusterConfigurationCallback; -import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.miniclusterImpl.ProcessReference; -import org.apache.commons.io.input.Tailer; -import org.apache.commons.io.input.TailerListenerAdapter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.io.Text; -import org.bouncycastle.util.Arrays; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.gson.Gson; - -public class ExternalCompactionIT extends SharedMiniClusterBase - implements MiniClusterConfigurationCallback { - - private static final Logger LOG = LoggerFactory.getLogger(ExternalCompactionIT.class); - - private static final int MAX_DATA = 1000; - - private HttpRequest req = null; - { - try { - req = HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - private final HttpClient hc = - HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL).build(); - - private static String row(int r) { - return String.format("r:%04d", r); - } - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { - cfg.setProperty("tserver.compaction.major.service.cs1.planner", - DefaultCompactionPlanner.class.getName()); - cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors", - "[{'name':'all', 'type': 'external', 'queue': 'DCQ1'}]"); - cfg.setProperty("tserver.compaction.major.service.cs2.planner", - DefaultCompactionPlanner.class.getName()); - cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors", - "[{'name':'all', 'type': 'external','queue': 'DCQ2'}]"); - cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), "5s"); - cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "3s"); - cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); - // use raw local file system so walogs sync and flush will work - coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - public static class TestFilter extends Filter { - - int modulus = 1; - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, - IteratorEnvironment env) throws IOException { - super.init(source, options, env); - - // this cast should fail if the compaction is running in the tserver - CompactorIterEnv cienv = (CompactorIterEnv) env; - - Preconditions.checkArgument(!cienv.getQueueName().isEmpty()); - Preconditions - .checkArgument(options.getOrDefault("expectedQ", "").equals(cienv.getQueueName())); - Preconditions.checkArgument(cienv.isUserCompaction()); - Preconditions.checkArgument(cienv.getIteratorScope() == IteratorScope.majc); - Preconditions.checkArgument(!cienv.isSamplingEnabled()); - - // if the init function is never called at all, then not setting the modulus option should - // cause the test to fail - if (options.containsKey("modulus")) { - Preconditions.checkArgument(!options.containsKey("pmodulus")); - Preconditions.checkArgument(cienv.isFullMajorCompaction()); - modulus = Integer.parseInt(options.get("modulus")); - } - - // use when partial compaction is expected - if (options.containsKey("pmodulus")) { - Preconditions.checkArgument(!options.containsKey("modulus")); - Preconditions.checkArgument(!cienv.isFullMajorCompaction()); - modulus = Integer.parseInt(options.get("pmodulus")); - } - } - - @Override - public boolean accept(Key k, Value v) { - return Integer.parseInt(v.toString()) % modulus == 0; - } - - } - - @Before - public void setUp() throws Exception { - if (SharedMiniClusterBase.getCluster() == null) { - SharedMiniClusterBase.startMiniClusterWithConfig(this); - } - } - - @After - public void tearDown() throws Exception { - // The tables need to be deleted between tests because MAC - // is not being restarted and it's possible that a test - // will not get the expected compaction. The compaction that - // is run during a test could be for a table from the previous - // test due to the way the previous test ended. - cleanupTables(); - } - - private void stopProcesses(ProcessInfo... processes) throws Exception { - for (ProcessInfo p : processes) { - if (p != null) { - Process proc = p.getProcess(); - if (proc.supportsNormalTermination()) { - proc.destroyForcibly(); - } else { - LOG.info("Stopping process manually"); - new ProcessBuilder("kill", Long.toString(proc.pid())).start(); - proc.waitFor(); - } - } - } - } - - private void cleanupTables() { - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - for (String table : client.tableOperations().list()) { - try { - if (!table.startsWith("accumulo")) { - client.tableOperations().cancelCompaction(table); - client.tableOperations().delete(table); - } - } catch (Exception e) { - fail("Error deleting table: " + table + ", msg: " + e.getMessage()); - } - } - } - } - - private Stream<ExternalCompactionFinalState> getFinalStatesForTable(TableId tid) { - return getCluster().getServerContext().getAmple().getExternalCompactionFinalStates() - .filter(state -> state.getExtent().tableId().equals(tid)); - } - - @Test - public void testExternalCompaction() throws Exception { - ProcessInfo c1 = null, c2 = null, coord = null; - String[] names = this.getUniqueNames(2); - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - String table1 = names[0]; - createTable(client, table1, "cs1"); - - String table2 = names[1]; - createTable(client, table2, "cs2"); - - writeData(client, table1); - writeData(client, table2); - - c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - c2 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ2"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - - compact(client, table1, 2, "DCQ1", true); - verify(client, table1, 2); - - SortedSet<Text> splits = new TreeSet<>(); - splits.add(new Text(row(MAX_DATA / 2))); - client.tableOperations().addSplits(table2, splits); - - compact(client, table2, 3, "DCQ2", true); - verify(client, table2, 3); - - } finally { - // Stop the Compactor and Coordinator that we started - stopProcesses(c1, c2, coord); - } - } - - @Test - public void testSplitDuringExternalCompaction() throws Exception { - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - createTable(client, table1, "cs1"); - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - writeData(client, table1); - - c1 = SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinator.class); - compact(client, table1, 2, "DCQ1", false); - - // Wait for the compaction to start by waiting for 1 external compaction column - Set<ExternalCompactionId> ecids = new HashSet<>(); - do { - UtilWaitThread.sleep(50); - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.ECOMP).build()) { - tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) - .forEach(ecids::add); - } - } while (ecids.isEmpty()); - - // ExternalDoNothingCompactor will not compact, it will wait, split the table. - SortedSet<Text> splits = new TreeSet<>(); - int jump = MAX_DATA / 5; - for (int r = jump; r < MAX_DATA; r += jump) { - splits.add(new Text(row(r))); - } - - assertEquals(0, getCoordinatorMetrics().getFailed()); - - client.tableOperations().addSplits(table1, splits); - - // wait for failure or test timeout - ExternalCompactionMetrics metrics = getCoordinatorMetrics(); - while (metrics.getFailed() == 0) { - UtilWaitThread.sleep(250); - metrics = getCoordinatorMetrics(); - } - - // Check that there is one failed compaction in the coordinator metrics - assertTrue(metrics.getStarted() > 0); - assertEquals(0, metrics.getCompleted()); - assertEquals(1, metrics.getFailed()); - - // ensure compaction ids were deleted by split operation from metadata table - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.ECOMP).build()) { - Set<ExternalCompactionId> ecids2 = tm.stream() - .flatMap(t -> t.getExternalCompactions().keySet().stream()).collect(Collectors.toSet()); - assertTrue(Collections.disjoint(ecids, ecids2)); - } - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testCoordinatorRestartsDuringCompaction() throws Exception { - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - createTable(client, table1, "cs1", 2); - writeData(client, table1); - c1 = SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - compact(client, table1, 2, "DCQ1", false); - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - // Wait for the compaction to start by waiting for 1 external compaction column - Set<ExternalCompactionId> ecids = new HashSet<>(); - do { - UtilWaitThread.sleep(50); - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.ECOMP).build()) { - tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) - .forEach(ecids::add); - } - } while (ecids.isEmpty()); - - // Stop the Coordinator - stopProcesses(coord); - - // Start the TestCompactionCoordinator so that we have - // access to the metrics. - coord = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinator.class); - - // Wait for coordinator to start - ExternalCompactionMetrics metrics = null; - while (null == metrics) { - try { - metrics = getCoordinatorMetrics(); - } catch (Exception e) { - UtilWaitThread.sleep(250); - } - } - - // wait for failure or test timeout - metrics = getCoordinatorMetrics(); - while (metrics.getRunning() == 0) { - UtilWaitThread.sleep(250); - metrics = getCoordinatorMetrics(); - } - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testCompactionAndCompactorDies() throws Exception { - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - // Stop the TabletServer so that it does not commit the compaction - getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> { - try { - getCluster().killProcess(TABLET_SERVER, p); - } catch (Exception e) { - fail("Failed to shutdown tablet server"); - } - }); - // Start our TServer that will not commit the compaction - ProcessInfo tserv = SharedMiniClusterBase.getCluster().exec(ExternalCompactionTServer.class); - - createTable(client, table1, "cs1", 2); - writeData(client, table1); - ProcessInfo c1 = - SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - ProcessInfo coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - compact(client, table1, 2, "DCQ1", false); - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - // Wait for the compaction to start by waiting for 1 external compaction column - Set<ExternalCompactionId> ecids = new HashSet<>(); - do { - UtilWaitThread.sleep(250); - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.ECOMP).build()) { - tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) - .forEach(ecids::add); - } - } while (ecids.isEmpty()); - - // Kill the compactor - stopProcesses(c1); - - // DeadCompactionDetector in the CompactionCoordinator should fail the compaction. - long count = 0; - while (count == 0) { - count = this.getFinalStatesForTable(tid) - .filter(state -> state.getFinalState().equals(FinalState.FAILED)).count(); - UtilWaitThread.sleep(250); - } - - // Stop the processes we started - stopProcesses(tserv, coord); - } finally { - // We stopped the TServer and started our own, restart the original TabletServers - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - } - - } - - @Test - public void testMergeDuringExternalCompaction() throws Exception { - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - createTable(client, table1, "cs1", 2); - // set compaction ratio to 1 so that majc occurs naturally, not user compaction - // user compaction blocks merge - client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); - // cause multiple rfiles to be created - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - - c1 = SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinator.class); - - // Wait for the compaction to start by waiting for 1 external compaction column - Set<ExternalCompactionId> ecids = new HashSet<>(); - do { - UtilWaitThread.sleep(50); - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.ECOMP).build()) { - tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) - .forEach(ecids::add); - } - } while (ecids.isEmpty()); - - var md = new ArrayList<TabletMetadata>(); - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.PREV_ROW).build()) { - tm.forEach(t -> md.add(t)); - assertEquals(2, md.size()); - } - - assertEquals(0, getCoordinatorMetrics().getFailed()); - - // Merge - blocking operation - Text start = md.get(0).getPrevEndRow(); - Text end = md.get(1).getEndRow(); - client.tableOperations().merge(table1, start, end); - - // wait for failure or test timeout - ExternalCompactionMetrics metrics = getCoordinatorMetrics(); - while (metrics.getFailed() == 0) { - UtilWaitThread.sleep(250); - metrics = getCoordinatorMetrics(); - } - - // Check that there is one failed compaction in the coordinator metrics - assertTrue(metrics.getStarted() > 0); - assertEquals(0, metrics.getCompleted()); - assertTrue(metrics.getFailed() > 0); - - // ensure compaction ids were deleted by merge operation from metadata table - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forTable(tid).fetch(ColumnType.ECOMP).build()) { - Set<ExternalCompactionId> ecids2 = tm.stream() - .flatMap(t -> t.getExternalCompactions().keySet().stream()).collect(Collectors.toSet()); - // keep checking until test times out - while (!Collections.disjoint(ecids, ecids2)) { - UtilWaitThread.sleep(25); - ecids2 = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) - .collect(Collectors.toSet()); - } - } finally { - stopProcesses(c1, coord); - } - } - } - - @Test - public void testManytablets() throws Exception { - ProcessInfo c1 = null, c2 = null, c3 = null, c4 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - createTable(client, table1, "cs1", 200); - - writeData(client, table1); - - c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - c2 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - c3 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - c4 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - - compact(client, table1, 3, "DCQ1", true); - - verify(client, table1, 3); - } finally { - stopProcesses(c1, c2, c3, c4, coord); - } - } - - @Test - public void testExternalCompactionsRunWithTableOffline() throws Exception { - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - createTable(client, table1, "cs1"); - // set compaction ratio to 1 so that majc occurs naturally, not user compaction - // user compaction blocks merge - client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); - // cause multiple rfiles to be created - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - - c1 = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinatorForOfflineTable.class); - - // Wait for coordinator to start - ExternalCompactionMetrics metrics = null; - while (null == metrics) { - try { - metrics = getCoordinatorMetrics(); - } catch (Exception e) { - UtilWaitThread.sleep(250); - } - } - - // Offline the table when the compaction starts - Thread t = new Thread(() -> { - try { - ExternalCompactionMetrics metrics2 = getCoordinatorMetrics(); - while (metrics2.getStarted() == 0) { - metrics2 = getCoordinatorMetrics(); - } - client.tableOperations().offline(table1, false); - } catch (Exception e) { - LOG.error("Error: ", e); - fail("Failed to offline table"); - } - }); - t.start(); - - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - // Confirm that no final state is in the metadata table - assertEquals(0, this.getFinalStatesForTable(tid).count()); - - // Start the compactor - coord = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - - t.join(); - - // wait for completed or test timeout - metrics = getCoordinatorMetrics(); - while (metrics.getCompleted() == 0) { - UtilWaitThread.sleep(250); - metrics = getCoordinatorMetrics(); - } - - // Confirm that final state is in the metadata table - assertEquals(1, this.getFinalStatesForTable(tid).count()); - - // Online the table - client.tableOperations().online(table1); - - // wait for compaction to be committed by tserver or test timeout - long finalStateCount = this.getFinalStatesForTable(tid).count(); - while (finalStateCount > 0) { - UtilWaitThread.sleep(250); - finalStateCount = this.getFinalStatesForTable(tid).count(); - } - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testUserCompactionCancellation() throws Exception { - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - createTable(client, table1, "cs1"); - writeData(client, table1); - - // The ExternalDoNothingCompactor creates a compaction thread that - // sleeps for 5 minutes. - // Wait for the coordinator to insert the running compaction metadata - // entry into the metadata table, then cancel the compaction - c1 = SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinator.class); - - compact(client, table1, 2, "DCQ1", false); - - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - List<TabletMetadata> md = new ArrayList<>(); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - tm.forEach(t -> md.add(t)); - - while (md.size() == 0) { - tm.close(); - tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - tm.forEach(t -> md.add(t)); - } - - assertEquals(0, getCoordinatorMetrics().getFailed()); - - client.tableOperations().cancelCompaction(table1); - - // wait for failure or test timeout - ExternalCompactionMetrics metrics = getCoordinatorMetrics(); - while (metrics.getFailed() == 0) { - UtilWaitThread.sleep(250); - metrics = getCoordinatorMetrics(); - } - - assertEquals(1, metrics.getStarted()); - assertEquals(0, metrics.getRunning()); - assertEquals(0, metrics.getCompleted()); - assertEquals(1, metrics.getFailed()); - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testDeleteTableDuringUserExternalCompaction() throws Exception { - ProcessInfo c1 = null, coord = null; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - String table1 = "ectt6"; - createTable(client, table1, "cs1"); - writeData(client, table1); - - // The ExternalDoNothingCompactor creates a compaction thread that - // sleeps for 5 minutes. - // Wait for the coordinator to insert the running compaction metadata - // entry into the metadata table, then cancel the compaction - c1 = SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinator.class); - - compact(client, table1, 2, "DCQ1", false); - - List<TabletMetadata> md = new ArrayList<>(); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build(); - tm.forEach(t -> md.add(t)); - - while (md.size() == 0) { - tm.close(); - tm = getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER) - .fetch(ColumnType.ECOMP).build(); - tm.forEach(t -> md.add(t)); - } - - assertEquals(0, getCoordinatorMetrics().getFailed()); - - client.tableOperations().delete(table1); - - // wait for failure or test timeout - ExternalCompactionMetrics metrics = getCoordinatorMetrics(); - while (metrics.getFailed() == 0) { - UtilWaitThread.sleep(250); - metrics = getCoordinatorMetrics(); - } - - assertEquals(1, metrics.getStarted()); - assertEquals(0, metrics.getRunning()); - assertEquals(0, metrics.getCompleted()); - assertEquals(1, metrics.getFailed()); - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testDeleteTableDuringExternalCompaction() throws Exception { - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - createTable(client, table1, "cs1"); - // set compaction ratio to 1 so that majc occurs naturally, not user compaction - // user compaction blocks delete - client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); - // cause multiple rfiles to be created - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - - // The ExternalDoNothingCompactor creates a compaction thread that - // sleeps for 5 minutes. The compaction should occur naturally. - // Wait for the coordinator to insert the running compaction metadata - // entry into the metadata table, then delete the table. - c1 = SharedMiniClusterBase.getCluster().exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(TestCompactionCoordinator.class); - - TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); - LOG.warn("Tid for Table {} is {}", table1, tid); - List<TabletMetadata> md = new ArrayList<>(); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - tm.forEach(t -> md.add(t)); - - while (md.size() == 0) { - tm.close(); - tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - tm.forEach(t -> md.add(t)); - UtilWaitThread.sleep(250); - } - - assertEquals(0, getCoordinatorMetrics().getFailed()); - - client.tableOperations().delete(table1); - - // wait for failure or test timeout - ExternalCompactionMetrics metrics = getCoordinatorMetrics(); - while (metrics.getFailed() == 0) { - UtilWaitThread.sleep(50); - metrics = getCoordinatorMetrics(); - } - - tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - assertEquals(0, tm.stream().count()); - tm.close(); - - // The metadata tablets will be deleted from the metadata table because we have deleted the - // table. Verify that the compaction failed by looking at the metrics in the Coordinator. - assertEquals(1, metrics.getStarted()); - assertEquals(0, metrics.getRunning()); - assertEquals(0, metrics.getCompleted()); - assertEquals(1, metrics.getFailed()); - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testConfigurer() throws Exception { - String tableName = this.getUniqueNames(1)[0]; - - ProcessInfo c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - ProcessInfo coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - Map<String,String> props = Map.of("table.compaction.dispatcher", - SimpleCompactionDispatcher.class.getName(), "table.compaction.dispatcher.opts.service", - "cs1", Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); - client.tableOperations().create(tableName, ntc); - - byte[] data = new byte[100000]; - Arrays.fill(data, (byte) 65); - try (var writer = client.createBatchWriter(tableName)) { - for (int row = 0; row < 10; row++) { - Mutation m = new Mutation(row + ""); - m.at().family("big").qualifier("stuff").put(data); - writer.addMutation(m); - } - } - client.tableOperations().flush(tableName, null, null, true); - - // without compression, expect file to be large - long sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue("Unexpected files sizes : " + sizes, - sizes > data.length * 10 && sizes < data.length * 11); - - client.tableOperations().compact(tableName, - new CompactionConfig().setWait(true) - .setConfigurer(new PluginConfig(CompressionConfigurer.class.getName(), - Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz", - CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "")))); - - // after compacting with compression, expect small file - sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue("Unexpected files sizes: data: " + data.length + ", file:" + sizes, - sizes < data.length); - - client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - - // after compacting without compression, expect big files again - sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue("Unexpected files sizes : " + sizes, - sizes > data.length * 10 && sizes < data.length * 11); - - } finally { - stopProcesses(c1, coord); - } - } - - public static class ExtDevNull extends DevNull { - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, - IteratorEnvironment env) throws IOException { - super.init(source, options, env); - - // this cast should fail if the compaction is running in the tserver - CompactorIterEnv cienv = (CompactorIterEnv) env; - - Preconditions.checkArgument(!cienv.getQueueName().isEmpty()); - } - } - - @Test - public void testExternalCompactionWithTableIterator() throws Exception { - // in addition to testing table configured iters w/ external compaction, this also tests an - // external compaction that deletes everything - - ProcessInfo c1 = null, coord = null; - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - createTable(client, table1, "cs1"); - writeData(client, table1); - c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - compact(client, table1, 2, "DCQ1", true); - verify(client, table1, 2); - - IteratorSetting setting = new IteratorSetting(50, "delete", ExtDevNull.class); - client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); - client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - - try (Scanner s = client.createScanner(table1)) { - assertFalse(s.iterator().hasNext()); - } - } finally { - stopProcesses(c1, coord); - } - } - - @Test - public void testExternalCompactionDeadTServer() throws Exception { - // Shut down the normal TServers - getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> { - try { - getCluster().killProcess(TABLET_SERVER, p); - } catch (Exception e) { - fail("Failed to shutdown tablet server"); - } - }); - // Start our TServer that will not commit the compaction - ProcessInfo tserv = SharedMiniClusterBase.getCluster().exec(ExternalCompactionTServer.class); - - final String table3 = this.getUniqueNames(1)[0]; - ProcessInfo c1 = null, coord = null; - try (final AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - createTable(client, table3, "cs1"); - writeData(client, table3); - c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - compact(client, table3, 2, "DCQ1", false); - - // ExternalCompactionTServer will not commit the compaction. Wait for the - // metadata table entries to show up. - LOG.info("Waiting for external compaction to complete."); - TableId tid = Tables.getTableId(getCluster().getServerContext(), table3); - Stream<ExternalCompactionFinalState> fs = this.getFinalStatesForTable(tid); - while (fs.count() == 0) { - LOG.info("Waiting for compaction completed marker to appear"); - UtilWaitThread.sleep(250); - fs = this.getFinalStatesForTable(tid); - } - - LOG.info("Validating metadata table contents."); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - List<TabletMetadata> md = new ArrayList<>(); - tm.forEach(t -> md.add(t)); - assertEquals(1, md.size()); - TabletMetadata m = md.get(0); - Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions(); - assertEquals(1, em.size()); - List<ExternalCompactionFinalState> finished = new ArrayList<>(); - this.getFinalStatesForTable(tid).forEach(f -> finished.add(f)); - assertEquals(1, finished.size()); - assertEquals(em.entrySet().iterator().next().getKey(), - finished.get(0).getExternalCompactionId()); - tm.close(); - - // Force a flush on the metadata table before killing our tserver - client.tableOperations().flush("accumulo.metadata"); - - // Stop our TabletServer. Need to perform a normal shutdown so that the WAL is closed - // normally. - LOG.info("Stopping our tablet server"); - stopProcesses(tserv); - - // Start a TabletServer to commit the compaction. - LOG.info("Starting normal tablet server"); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - - // Wait for the compaction to be committed. - LOG.info("Waiting for compaction completed marker to disappear"); - Stream<ExternalCompactionFinalState> fs2 = this.getFinalStatesForTable(tid); - while (fs2.count() != 0) { - LOG.info("Waiting for compaction completed marker to disappear"); - UtilWaitThread.sleep(500); - fs2 = this.getFinalStatesForTable(tid); - } - verify(client, table3, 2); - } finally { - stopProcesses(c1, coord); - } - - } - - public static class FSelector implements CompactionSelector { - - @Override - public void init(InitParameters iparams) {} - - @Override - public Selection select(SelectionParameters sparams) { - List<CompactableFile> toCompact = sparams.getAvailableFiles().stream() - .filter(cf -> cf.getFileName().startsWith("F")).collect(Collectors.toList()); - return new Selection(toCompact); - } - - } - - @Test - public void testPartialCompaction() throws Exception { - ProcessInfo c1 = null, coord = null; - String tableName = getUniqueNames(1)[0]; - try (final AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - - c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - - createTable(client, tableName, "cs1"); - - writeData(client, tableName); - // This should create an A file - compact(client, tableName, 17, "DCQ1", true); - verify(client, tableName, 17); - - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = MAX_DATA; i < MAX_DATA * 2; i++) { - Mutation m = new Mutation(row(i)); - m.put("", "", "" + i); - bw.addMutation(m); - } - } - - // this should create an F file - client.tableOperations().flush(tableName); - - // run a compaction that only compacts F files - IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); - // make sure iterator options make it to compactor process - iterSetting.addOption("expectedQ", "DCQ1"); - // compact F file w/ different modulus and user pmodulus option for partial compaction - iterSetting.addOption("pmodulus", 19 + ""); - CompactionConfig config = new CompactionConfig().setIterators(List.of(iterSetting)) - .setWait(true).setSelector(new PluginConfig(FSelector.class.getName())); - client.tableOperations().compact(tableName, config); - - try (Scanner scanner = client.createScanner(tableName)) { - int count = 0; - for (Entry<Key,Value> entry : scanner) { - - int v = Integer.parseInt(entry.getValue().toString()); - int modulus = v < MAX_DATA ? 17 : 19; - - assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", modulus), - Integer.parseInt(entry.getValue().toString()) % modulus == 0); - count++; - } - - int expectedCount = 0; - for (int i = 0; i < MAX_DATA * 2; i++) { - int modulus = i < MAX_DATA ? 17 : 19; - if (i % modulus == 0) { - expectedCount++; - } - } - - assertEquals(expectedCount, count); - } - - } finally { - stopProcesses(c1, coord); - } - } - - private static Optional<String> extract(String input, String regex) { - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(input); - if (matcher.matches()) { - return Optional.of(matcher.group(1)); - } - - return Optional.empty(); - } - - @Test - public void testMetrics() throws Exception { - Collection<ProcessReference> tservers = - getCluster().getProcesses().get(ServerType.TABLET_SERVER); - assertEquals(2, tservers.size()); - // kill one tserver so that queue metrics are not spread across tservers - getCluster().killProcess(TABLET_SERVER, tservers.iterator().next()); - ProcessInfo c1 = null, c2 = null, coord = null; - String[] names = getUniqueNames(2); - try (final AccumuloClient client = Accumulo.newClient() - .from(SharedMiniClusterBase.getCluster().getClientProperties()).build()) { - String table1 = names[0]; - createTable(client, table1, "cs1", 5); - - String table2 = names[1]; - createTable(client, table2, "cs2", 10); - - writeData(client, table1); - writeData(client, table2); - - LinkedBlockingQueue<String> queueMetrics = new LinkedBlockingQueue<>(); - - Tailer tailer = - Tailer.create(new File("./target/tserver.metrics"), new TailerListenerAdapter() { - @Override - public void handle(final String line) { - extract(line, ".*(DCQ1_queued=[0-9]+).*").ifPresent(queueMetrics::add); - extract(line, ".*(DCQ2_queued=[0-9]+).*").ifPresent(queueMetrics::add); - } - }); - - compact(client, table1, 7, "DCQ1", false); - compact(client, table2, 13, "DCQ2", false); - - boolean sawDCQ1_5 = false; - boolean sawDCQ2_10 = false; - - // wait until expected number of queued are seen in metrics - while (!sawDCQ1_5 || !sawDCQ2_10) { - String qm = queueMetrics.take(); - sawDCQ1_5 |= qm.equals("DCQ1_queued=5"); - sawDCQ2_10 |= qm.equals("DCQ2_queued=10"); - } - - // start compactors - c1 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ1"); - c2 = SharedMiniClusterBase.getCluster().exec(Compactor.class, "-q", "DCQ2"); - coord = SharedMiniClusterBase.getCluster().exec(CompactionCoordinator.class); - - boolean sawDCQ1_0 = false; - boolean sawDCQ2_0 = false; - - // wait until queued goes to zero in metrics - while (!sawDCQ1_0 || !sawDCQ2_0) { - String qm = queueMetrics.take(); - sawDCQ1_0 |= qm.equals("DCQ1_queued=0"); - sawDCQ2_0 |= qm.equals("DCQ2_queued=0"); - } - - tailer.stop(); - - // Wait for all external compactions to complete - long count; - do { - UtilWaitThread.sleep(100); - try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() - .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build()) { - count = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()).count(); - } - } while (count > 0); - - verify(client, table1, 7); - verify(client, table2, 13); - - } finally { - stopProcesses(c1, c2, coord); - // We stopped the TServer and started our own, restart the original TabletServers - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - } - } - - private ExternalCompactionMetrics getCoordinatorMetrics() throws Exception { - HttpResponse<String> res = hc.send(req, BodyHandlers.ofString()); - assertEquals(200, res.statusCode()); - String metrics = res.body(); - assertNotNull(metrics); - return new Gson().fromJson(metrics, ExternalCompactionMetrics.class); - } - - private void verify(AccumuloClient client, String table1, int modulus) - throws TableNotFoundException, AccumuloSecurityException, AccumuloException { - try (Scanner scanner = client.createScanner(table1)) { - int count = 0; - for (Entry<Key,Value> entry : scanner) { - assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", modulus), - Integer.parseInt(entry.getValue().toString()) % modulus == 0); - count++; - } - - int expectedCount = 0; - for (int i = 0; i < MAX_DATA; i++) { - if (i % modulus == 0) - expectedCount++; - } - - assertEquals(expectedCount, count); - } - } - - private void compact(final AccumuloClient client, String table1, int modulus, - String expectedQueue, boolean wait) - throws AccumuloSecurityException, TableNotFoundException, AccumuloException { - IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); - // make sure iterator options make it to compactor process - iterSetting.addOption("expectedQ", expectedQueue); - iterSetting.addOption("modulus", modulus + ""); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait); - client.tableOperations().compact(table1, config); - } - - private void createTable(AccumuloClient client, String tableName, String service) - throws Exception { - Map<String,String> props = - Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), - "table.compaction.dispatcher.opts.service", service); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); - - client.tableOperations().create(tableName, ntc); - - } - - private void createTable(AccumuloClient client, String tableName, String service, int numTablets) - throws Exception { - SortedSet<Text> splits = new TreeSet<>(); - int jump = MAX_DATA / numTablets; - - for (int r = jump; r < MAX_DATA; r += jump) { - splits.add(new Text(row(r))); - } - - createTable(client, tableName, service, splits); - } - - private void createTable(AccumuloClient client, String tableName, String service, - SortedSet<Text> splits) throws Exception { - Map<String,String> props = - Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), - "table.compaction.dispatcher.opts.service", service); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props).withSplits(splits); - - client.tableOperations().create(tableName, ntc); - - } - - private void writeData(AccumuloClient client, String table1) throws MutationsRejectedException, - TableNotFoundException, AccumuloException, AccumuloSecurityException { - try (BatchWriter bw = client.createBatchWriter(table1)) { - for (int i = 0; i < MAX_DATA; i++) { - Mutation m = new Mutation(row(i)); - m.put("", "", "" + i); - bw.addMutation(m); - } - } - - client.tableOperations().flush(table1); - } -} diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java index a5c8cfe..12ab0b1 100644 --- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java @@ -95,6 +95,7 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.test.categories.MiniClusterOnlyTests; import org.apache.accumulo.test.categories.SunnyDayTests; +import org.apache.accumulo.test.compaction.TestCompactionStrategy; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.tracer.TraceServer; import org.apache.commons.io.FileUtils; diff --git a/test/src/main/java/org/apache/accumulo/test/CompactionExecutorIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java similarity index 99% rename from test/src/main/java/org/apache/accumulo/test/CompactionExecutorIT.java rename to test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java index b2aafb7..298e28a 100644 --- a/test/src/main/java/org/apache/accumulo/test/CompactionExecutorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; diff --git a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingDeprecatedIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingDeprecatedIT.java similarity index 95% rename from test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingDeprecatedIT.java rename to test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingDeprecatedIT.java index f43e585..c1e7553 100644 --- a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingDeprecatedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingDeprecatedIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import org.apache.accumulo.core.conf.Property; diff --git a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java similarity index 98% rename from test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java rename to test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java index 84b22f6..cee1319 100644 --- a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import static org.junit.Assert.assertTrue; diff --git a/test/src/main/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ConfigurableMajorCompactionIT.java similarity index 99% rename from test/src/main/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java rename to test/src/main/java/org/apache/accumulo/test/compaction/ConfigurableMajorCompactionIT.java index 299fb60..cb3ff82 100644 --- a/test/src/main/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ConfigurableMajorCompactionIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionTServer.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java similarity index 97% rename from test/src/main/java/org/apache/accumulo/test/ExternalCompactionTServer.java rename to test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java index 2282e47..2cac5fa 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.tserver.TabletServer; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionUtils.java new file mode 100644 index 0000000..a3512c0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionUtils.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Redirect; +import java.net.http.HttpClient.Version; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Stream; + +import org.apache.accumulo.cluster.AccumuloCluster; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.coordinator.ExternalCompactionMetrics; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; +import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; +import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.internal.Maps; +import com.google.gson.Gson; + +public class ExternalCompactionUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalCompactionUtils.class); + + public static final int MAX_DATA = 1000; + + public static String row(int r) { + return String.format("r:%04d", r); + } + + public static Stream<ExternalCompactionFinalState> getFinalStatesForTable(AccumuloCluster cluster, + TableId tid) { + return cluster.getServerContext().getAmple().getExternalCompactionFinalStates() + .filter(state -> state.getExtent().tableId().equals(tid)); + } + + public static void compact(final AccumuloClient client, String table1, int modulus, + String expectedQueue, boolean wait) + throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); + // make sure iterator options make it to compactor process + iterSetting.addOption("expectedQ", expectedQueue); + iterSetting.addOption("modulus", modulus + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait); + client.tableOperations().compact(table1, config); + } + + public static void createTable(AccumuloClient client, String tableName, String service) + throws Exception { + Map<String,String> props = + Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", service); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + + client.tableOperations().create(tableName, ntc); + + } + + public static void createTable(AccumuloClient client, String tableName, String service, + int numTablets) throws Exception { + SortedSet<Text> splits = new TreeSet<>(); + int jump = MAX_DATA / numTablets; + + for (int r = jump; r < MAX_DATA; r += jump) { + splits.add(new Text(row(r))); + } + + createTable(client, tableName, service, splits); + } + + public static void createTable(AccumuloClient client, String tableName, String service, + SortedSet<Text> splits) throws Exception { + Map<String,String> props = + Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", service); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props).withSplits(splits); + + client.tableOperations().create(tableName, ntc); + + } + + public static void writeData(AccumuloClient client, String table1) + throws MutationsRejectedException, TableNotFoundException, AccumuloException, + AccumuloSecurityException { + try (BatchWriter bw = client.createBatchWriter(table1)) { + for (int i = 0; i < MAX_DATA; i++) { + Mutation m = new Mutation(row(i)); + m.put("", "", "" + i); + bw.addMutation(m); + } + } + + client.tableOperations().flush(table1); + } + + public static void verify(AccumuloClient client, String table1, int modulus) + throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + try (Scanner scanner = client.createScanner(table1)) { + int count = 0; + for (Entry<Key,Value> entry : scanner) { + assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", modulus), + Integer.parseInt(entry.getValue().toString()) % modulus == 0); + count++; + } + + int expectedCount = 0; + for (int i = 0; i < MAX_DATA; i++) { + if (i % modulus == 0) + expectedCount++; + } + + assertEquals(expectedCount, count); + } + } + + public static void stopProcesses(ProcessInfo... processes) throws Exception { + for (ProcessInfo p : processes) { + if (p != null) { + Process proc = p.getProcess(); + if (proc.supportsNormalTermination()) { + LOG.info("Stopping process {}", proc.pid()); + proc.destroyForcibly().waitFor(); + } else { + LOG.info("Stopping process {} manually", proc.pid()); + new ProcessBuilder("kill", Long.toString(proc.pid())).start(); + proc.waitFor(); + } + } + } + } + + public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + + // ecomp writes from the TabletServer are not being written to the metadata + // table, they are being queued up instead. + Map<String,String> clProps = Maps.newHashMap(); + clProps.put(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey(), "2s"); + cfg.setClientProps(clProps); + + cfg.setProperty("tserver.compaction.major.service.cs1.planner", + DefaultCompactionPlanner.class.getName()); + cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors", + "[{'name':'all', 'type': 'external', 'queue': 'DCQ1'}]"); + cfg.setProperty("tserver.compaction.major.service.cs2.planner", + DefaultCompactionPlanner.class.getName()); + cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors", + "[{'name':'all', 'type': 'external','queue': 'DCQ2'}]"); + cfg.setProperty(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL, "5s"); + cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); + cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "3s"); + cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); + cfg.setProperty(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE, "10"); + cfg.setProperty(Property.MANAGER_FATE_THREADPOOL_SIZE, "10"); + // use raw local file system so walogs sync and flush will work + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + private static HttpRequest req = null; + static { + try { + req = HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static final HttpClient hc = + HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL).build(); + + public static ExternalCompactionMetrics getCoordinatorMetrics() throws Exception { + HttpResponse<String> res = hc.send(req, BodyHandlers.ofString()); + assertEquals(200, res.statusCode()); + String metrics = res.body(); + assertNotNull(metrics); + return new Gson().fromJson(metrics, ExternalCompactionMetrics.class); + } + + public static ProcessInfo startCoordinator(MiniAccumuloClusterImpl cluster, + Class<? extends CompactionCoordinator> coord) throws IOException { + ProcessInfo pi = cluster.exec(coord); + if (TestCompactionCoordinator.class.isAssignableFrom(coord)) { + // Wait for coordinator to start + ExternalCompactionMetrics metrics = null; + while (null == metrics) { + try { + metrics = getCoordinatorMetrics(); + } catch (Exception e) { + UtilWaitThread.sleep(250); + } + } + } + return pi; + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java new file mode 100644 index 0000000..0923489 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.PluginConfig; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; +import org.apache.accumulo.core.clientImpl.Tables; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.DevNull; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.miniclusterImpl.ProcessReference; +import org.apache.commons.io.input.Tailer; +import org.apache.commons.io.input.TailerListenerAdapter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class ExternalCompaction_1_IT extends AccumuloClusterHarness + implements MiniClusterConfigurationCallback { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalCompaction_1_IT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionUtils.configureMiniCluster(cfg, coreSite); + } + + public static class TestFilter extends Filter { + + int modulus = 1; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + // this cast should fail if the compaction is running in the tserver + CompactorIterEnv cienv = (CompactorIterEnv) env; + + Preconditions.checkArgument(!cienv.getQueueName().isEmpty()); + Preconditions + .checkArgument(options.getOrDefault("expectedQ", "").equals(cienv.getQueueName())); + Preconditions.checkArgument(cienv.isUserCompaction()); + Preconditions.checkArgument(cienv.getIteratorScope() == IteratorScope.majc); + Preconditions.checkArgument(!cienv.isSamplingEnabled()); + + // if the init function is never called at all, then not setting the modulus option should + // cause the test to fail + if (options.containsKey("modulus")) { + Preconditions.checkArgument(!options.containsKey("pmodulus")); + Preconditions.checkArgument(cienv.isFullMajorCompaction()); + modulus = Integer.parseInt(options.get("modulus")); + } + + // use when partial compaction is expected + if (options.containsKey("pmodulus")) { + Preconditions.checkArgument(!options.containsKey("modulus")); + Preconditions.checkArgument(!cienv.isFullMajorCompaction()); + modulus = Integer.parseInt(options.get("pmodulus")); + } + } + + @Override + public boolean accept(Key k, Value v) { + return Integer.parseInt(v.toString()) % modulus == 0; + } + + } + + @Test + public void testExternalCompaction() throws Exception { + ProcessInfo c1 = null, c2 = null, coord = null; + String[] names = this.getUniqueNames(2); + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + String table1 = names[0]; + ExternalCompactionUtils.createTable(client, table1, "cs1"); + + String table2 = names[1]; + ExternalCompactionUtils.createTable(client, table2, "cs2"); + + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table2); + + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + c2 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ2"); + coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", true); + ExternalCompactionUtils.verify(client, table1, 2); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text(ExternalCompactionUtils.row(ExternalCompactionUtils.MAX_DATA / 2))); + client.tableOperations().addSplits(table2, splits); + + ExternalCompactionUtils.compact(client, table2, 3, "DCQ2", true); + ExternalCompactionUtils.verify(client, table2, 3); + + } finally { + // Stop the Compactor and Coordinator that we started + ExternalCompactionUtils.stopProcesses(c1, c2, coord); + } + } + + @Test + public void testCompactionAndCompactorDies() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + // Stop the TabletServer so that it does not commit the compaction + ((MiniAccumuloClusterImpl) getCluster()).getProcesses().get(TABLET_SERVER).forEach(p -> { + try { + ((MiniAccumuloClusterImpl) getCluster()).killProcess(TABLET_SERVER, p); + } catch (Exception e) { + fail("Failed to shutdown tablet server"); + } + }); + // Start our TServer that will not commit the compaction + ProcessInfo tserv = + ((MiniAccumuloClusterImpl) getCluster()).exec(ExternalCompactionTServer.class); + + ExternalCompactionUtils.createTable(client, table1, "cs1", 2); + ExternalCompactionUtils.writeData(client, table1); + ProcessInfo c1 = ((MiniAccumuloClusterImpl) getCluster()) + .exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); + ProcessInfo coord = + ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", false); + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + // Wait for the compaction to start by waiting for 1 external compaction column + Set<ExternalCompactionId> ecids = new HashSet<>(); + do { + UtilWaitThread.sleep(250); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .forEach(ecids::add); + } + } while (ecids.isEmpty()); + + // Kill the compactor + ExternalCompactionUtils.stopProcesses(c1); + + // DeadCompactionDetector in the CompactionCoordinator should fail the compaction. + long count = 0; + while (count == 0) { + count = ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid) + .filter(state -> state.getFinalState().equals(FinalState.FAILED)).count(); + UtilWaitThread.sleep(250); + } + + // Stop the processes we started + ExternalCompactionUtils.stopProcesses(tserv, coord); + } finally { + // We stopped the TServer and started our own, restart the original TabletServers + ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER); + } + + } + + @Test + public void testManytablets() throws Exception { + ProcessInfo c1 = null, c2 = null, c3 = null, c4 = null, coord = null; + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionUtils.createTable(client, table1, "cs1", 200); + + ExternalCompactionUtils.writeData(client, table1); + + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + c2 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + c3 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + c4 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + + ExternalCompactionUtils.compact(client, table1, 3, "DCQ1", true); + + ExternalCompactionUtils.verify(client, table1, 3); + } finally { + ExternalCompactionUtils.stopProcesses(c1, c2, c3, c4, coord); + } + } + + @Test + public void testConfigurer() throws Exception { + String tableName = this.getUniqueNames(1)[0]; + + ProcessInfo c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + ProcessInfo coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + Map<String,String> props = Map.of("table.compaction.dispatcher", + SimpleCompactionDispatcher.class.getName(), "table.compaction.dispatcher.opts.service", + "cs1", Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + client.tableOperations().create(tableName, ntc); + + byte[] data = new byte[100000]; + Arrays.fill(data, (byte) 65); + try (var writer = client.createBatchWriter(tableName)) { + for (int row = 0; row < 10; row++) { + Mutation m = new Mutation(row + ""); + m.at().family("big").qualifier("stuff").put(data); + writer.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + + // without compression, expect file to be large + long sizes = CompactionExecutorIT.getFileSizes(client, tableName); + assertTrue("Unexpected files sizes : " + sizes, + sizes > data.length * 10 && sizes < data.length * 11); + + client.tableOperations().compact(tableName, + new CompactionConfig().setWait(true) + .setConfigurer(new PluginConfig(CompressionConfigurer.class.getName(), + Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz", + CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "")))); + + // after compacting with compression, expect small file + sizes = CompactionExecutorIT.getFileSizes(client, tableName); + assertTrue("Unexpected files sizes: data: " + data.length + ", file:" + sizes, + sizes < data.length); + + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + // after compacting without compression, expect big files again + sizes = CompactionExecutorIT.getFileSizes(client, tableName); + assertTrue("Unexpected files sizes : " + sizes, + sizes > data.length * 10 && sizes < data.length * 11); + + } finally { + ExternalCompactionUtils.stopProcesses(c1, coord); + } + } + + public static class ExtDevNull extends DevNull { + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + // this cast should fail if the compaction is running in the tserver + CompactorIterEnv cienv = (CompactorIterEnv) env; + + Preconditions.checkArgument(!cienv.getQueueName().isEmpty()); + } + } + + @Test + public void testExternalCompactionWithTableIterator() throws Exception { + // in addition to testing table configured iters w/ external compaction, this also tests an + // external compaction that deletes everything + + ProcessInfo c1 = null, coord = null; + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + ExternalCompactionUtils.createTable(client, table1, "cs1"); + ExternalCompactionUtils.writeData(client, table1); + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", true); + ExternalCompactionUtils.verify(client, table1, 2); + + IteratorSetting setting = new IteratorSetting(50, "delete", ExtDevNull.class); + client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); + client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + + try (Scanner s = client.createScanner(table1)) { + assertFalse(s.iterator().hasNext()); + } + } finally { + ExternalCompactionUtils.stopProcesses(c1, coord); + } + } + + @Test + public void testExternalCompactionDeadTServer() throws Exception { + // Shut down the normal TServers + ((MiniAccumuloClusterImpl) getCluster()).getProcesses().get(TABLET_SERVER).forEach(p -> { + try { + ((MiniAccumuloClusterImpl) getCluster()).killProcess(TABLET_SERVER, p); + } catch (Exception e) { + fail("Failed to shutdown tablet server"); + } + }); + // Start our TServer that will not commit the compaction + ProcessInfo tserv = + ((MiniAccumuloClusterImpl) getCluster()).exec(ExternalCompactionTServer.class); + + final String table3 = this.getUniqueNames(1)[0]; + ProcessInfo c1 = null, coord = null; + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + ExternalCompactionUtils.createTable(client, table3, "cs1"); + ExternalCompactionUtils.writeData(client, table3); + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + ExternalCompactionUtils.compact(client, table3, 2, "DCQ1", false); + + // ExternalCompactionTServer will not commit the compaction. Wait for the + // metadata table entries to show up. + LOG.info("Waiting for external compaction to complete."); + TableId tid = Tables.getTableId(getCluster().getServerContext(), table3); + Stream<ExternalCompactionFinalState> fs = + ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid); + while (fs.count() == 0) { + LOG.info("Waiting for compaction completed marker to appear"); + UtilWaitThread.sleep(250); + fs = ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid); + } + + LOG.info("Validating metadata table contents."); + TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) + .fetch(ColumnType.ECOMP).build(); + List<TabletMetadata> md = new ArrayList<>(); + tm.forEach(t -> md.add(t)); + assertEquals(1, md.size()); + TabletMetadata m = md.get(0); + Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions(); + assertEquals(1, em.size()); + List<ExternalCompactionFinalState> finished = new ArrayList<>(); + ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid) + .forEach(f -> finished.add(f)); + assertEquals(1, finished.size()); + assertEquals(em.entrySet().iterator().next().getKey(), + finished.get(0).getExternalCompactionId()); + tm.close(); + + // Force a flush on the metadata table before killing our tserver + client.tableOperations().flush("accumulo.metadata"); + + // Stop our TabletServer. Need to perform a normal shutdown so that the WAL is closed + // normally. + LOG.info("Stopping our tablet server"); + ExternalCompactionUtils.stopProcesses(tserv); + + // Start a TabletServer to commit the compaction. + LOG.info("Starting normal tablet server"); + ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER); + + // Wait for the compaction to be committed. + LOG.info("Waiting for compaction completed marker to disappear"); + Stream<ExternalCompactionFinalState> fs2 = + ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid); + while (fs2.count() != 0) { + LOG.info("Waiting for compaction completed marker to disappear"); + UtilWaitThread.sleep(500); + fs2 = ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid); + } + ExternalCompactionUtils.verify(client, table3, 2); + } finally { + ExternalCompactionUtils.stopProcesses(c1, coord); + } + + } + + public static class FSelector implements CompactionSelector { + + @Override + public void init(InitParameters iparams) {} + + @Override + public Selection select(SelectionParameters sparams) { + List<CompactableFile> toCompact = sparams.getAvailableFiles().stream() + .filter(cf -> cf.getFileName().startsWith("F")).collect(Collectors.toList()); + return new Selection(toCompact); + } + + } + + @Test + public void testPartialCompaction() throws Exception { + ProcessInfo c1 = null, coord = null; + String tableName = getUniqueNames(1)[0]; + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + + ExternalCompactionUtils.createTable(client, tableName, "cs1"); + + ExternalCompactionUtils.writeData(client, tableName); + // This should create an A file + ExternalCompactionUtils.compact(client, tableName, 17, "DCQ1", true); + ExternalCompactionUtils.verify(client, tableName, 17); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = ExternalCompactionUtils.MAX_DATA; i < ExternalCompactionUtils.MAX_DATA * 2; + i++) { + Mutation m = new Mutation(ExternalCompactionUtils.row(i)); + m.put("", "", "" + i); + bw.addMutation(m); + } + } + + // this should create an F file + client.tableOperations().flush(tableName); + + // run a compaction that only compacts F files + IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); + // make sure iterator options make it to compactor process + iterSetting.addOption("expectedQ", "DCQ1"); + // compact F file w/ different modulus and user pmodulus option for partial compaction + iterSetting.addOption("pmodulus", 19 + ""); + CompactionConfig config = new CompactionConfig().setIterators(List.of(iterSetting)) + .setWait(true).setSelector(new PluginConfig(FSelector.class.getName())); + client.tableOperations().compact(tableName, config); + + try (Scanner scanner = client.createScanner(tableName)) { + int count = 0; + for (Entry<Key,Value> entry : scanner) { + + int v = Integer.parseInt(entry.getValue().toString()); + int modulus = v < ExternalCompactionUtils.MAX_DATA ? 17 : 19; + + assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", modulus), + Integer.parseInt(entry.getValue().toString()) % modulus == 0); + count++; + } + + int expectedCount = 0; + for (int i = 0; i < ExternalCompactionUtils.MAX_DATA * 2; i++) { + int modulus = i < ExternalCompactionUtils.MAX_DATA ? 17 : 19; + if (i % modulus == 0) { + expectedCount++; + } + } + + assertEquals(expectedCount, count); + } + + } finally { + ExternalCompactionUtils.stopProcesses(c1, coord); + } + } + + private static Optional<String> extract(String input, String regex) { + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(input); + if (matcher.matches()) { + return Optional.of(matcher.group(1)); + } + + return Optional.empty(); + } + + @Test + public void testMetrics() throws Exception { + Collection<ProcessReference> tservers = + ((MiniAccumuloClusterImpl) getCluster()).getProcesses().get(ServerType.TABLET_SERVER); + assertEquals(2, tservers.size()); + // kill one tserver so that queue metrics are not spread across tservers + ((MiniAccumuloClusterImpl) getCluster()).killProcess(TABLET_SERVER, tservers.iterator().next()); + ProcessInfo c1 = null, c2 = null, coord = null; + String[] names = getUniqueNames(2); + try (final AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + String table1 = names[0]; + ExternalCompactionUtils.createTable(client, table1, "cs1", 5); + + String table2 = names[1]; + ExternalCompactionUtils.createTable(client, table2, "cs2", 10); + + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table2); + + LinkedBlockingQueue<String> queueMetrics = new LinkedBlockingQueue<>(); + + Tailer tailer = + Tailer.create(new File("./target/tserver.metrics"), new TailerListenerAdapter() { + @Override + public void handle(final String line) { + extract(line, ".*(DCQ1_queued=[0-9]+).*").ifPresent(queueMetrics::add); + extract(line, ".*(DCQ2_queued=[0-9]+).*").ifPresent(queueMetrics::add); + } + }); + + ExternalCompactionUtils.compact(client, table1, 7, "DCQ1", false); + ExternalCompactionUtils.compact(client, table2, 13, "DCQ2", false); + + boolean sawDCQ1_5 = false; + boolean sawDCQ2_10 = false; + + // wait until expected number of queued are seen in metrics + while (!sawDCQ1_5 || !sawDCQ2_10) { + String qm = queueMetrics.take(); + sawDCQ1_5 |= qm.equals("DCQ1_queued=5"); + sawDCQ2_10 |= qm.equals("DCQ2_queued=10"); + } + + // start compactors + c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + c2 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ2"); + coord = ((MiniAccumuloClusterImpl) getCluster()).exec(CompactionCoordinator.class); + + boolean sawDCQ1_0 = false; + boolean sawDCQ2_0 = false; + + // wait until queued goes to zero in metrics + while (!sawDCQ1_0 || !sawDCQ2_0) { + String qm = queueMetrics.take(); + sawDCQ1_0 |= qm.equals("DCQ1_queued=0"); + sawDCQ2_0 |= qm.equals("DCQ2_queued=0"); + } + + tailer.stop(); + + // Wait for all external compactions to complete + long count; + do { + UtilWaitThread.sleep(100); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build()) { + count = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()).count(); + } + } while (count > 0); + + ExternalCompactionUtils.verify(client, table1, 7); + ExternalCompactionUtils.verify(client, table2, 13); + + } finally { + ExternalCompactionUtils.stopProcesses(c1, c2, coord); + // We stopped the TServer and started our own, restart the original TabletServers + ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.TABLET_SERVER); + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java new file mode 100644 index 0000000..7e8a75a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.coordinator.ExternalCompactionMetrics; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.clientImpl.Tables; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalCompaction_2_IT extends AccumuloClusterHarness + implements MiniClusterConfigurationCallback { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalCompaction_2_IT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionUtils.configureMiniCluster(cfg, coreSite); + } + + private ProcessInfo testCompactor = null; + private ProcessInfo testCoordinator = null; + + @Before + public void setUp() throws Exception { + super.setupCluster(); + testCompactor = ((MiniAccumuloClusterImpl) getCluster()).exec(ExternalDoNothingCompactor.class, + "-q", "DCQ1"); + testCoordinator = ExternalCompactionUtils.startCoordinator( + ((MiniAccumuloClusterImpl) getCluster()), TestCompactionCoordinator.class); + } + + @After + public void tearDown() throws Exception { + super.teardownCluster(); + ExternalCompactionUtils.stopProcesses(testCompactor, testCoordinator); + testCompactor = null; + testCoordinator = null; + } + + @Test + public void testSplitDuringExternalCompaction() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionMetrics startingMetrics = ExternalCompactionUtils.getCoordinatorMetrics(); + + ExternalCompactionUtils.createTable(client, table1, "cs1"); + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", false); + + // Wait for the compaction to start by waiting for 1 external compaction column + Set<ExternalCompactionId> ecids = new HashSet<>(); + do { + UtilWaitThread.sleep(50); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .forEach(ecids::add); + } + } while (ecids.isEmpty()); + + // ExternalDoNothingCompactor will not compact, it will wait, split the table. + SortedSet<Text> splits = new TreeSet<>(); + int jump = ExternalCompactionUtils.MAX_DATA / 5; + for (int r = jump; r < ExternalCompactionUtils.MAX_DATA; r += jump) { + splits.add(new Text(ExternalCompactionUtils.row(r))); + } + + client.tableOperations().addSplits(table1, splits); + + // wait for failure or test timeout + ExternalCompactionMetrics metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics.getFailed() == startingMetrics.getFailed()) { + UtilWaitThread.sleep(250); + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } + + // Check that there is one failed compaction in the coordinator metrics + assertTrue(metrics.getStarted() > 0); + assertEquals(startingMetrics.getCompleted(), metrics.getCompleted()); + assertTrue(metrics.getFailed() > startingMetrics.getFailed()); + + // ensure compaction ids were deleted by split operation from metadata table + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + Set<ExternalCompactionId> ecids2 = tm.stream() + .flatMap(t -> t.getExternalCompactions().keySet().stream()).collect(Collectors.toSet()); + assertTrue(Collections.disjoint(ecids, ecids2)); + } + } + } + + @Test + public void testExternalCompactionsRunWithTableOffline() throws Exception { + ExternalCompactionMetrics startingMetrics = ExternalCompactionUtils.getCoordinatorMetrics(); + ExternalCompactionUtils.stopProcesses(testCoordinator, testCompactor); + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionUtils.createTable(client, table1, "cs1"); + // set compaction ratio to 1 so that majc occurs naturally, not user compaction + // user compaction blocks merge + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); + // cause multiple rfiles to be created + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + + ProcessInfo coord = ExternalCompactionUtils.startCoordinator( + ((MiniAccumuloClusterImpl) getCluster()), TestCompactionCoordinatorForOfflineTable.class); + + ExternalCompactionMetrics metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + + final long started = metrics.getStarted(); + // Offline the table when the compaction starts + Thread t = new Thread(() -> { + try { + ExternalCompactionMetrics metrics2 = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics2.getStarted() == started) { + metrics2 = ExternalCompactionUtils.getCoordinatorMetrics(); + } + client.tableOperations().offline(table1, false); + } catch (Exception e) { + LOG.error("Error: ", e); + fail("Failed to offline table"); + } + }); + t.start(); + + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + // Confirm that no final state is in the metadata table + assertEquals(0, ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid).count()); + + // Start the compactor + ProcessInfo comp = + ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class, "-q", "DCQ1"); + + t.join(); + + // wait for completed or test timeout + ExternalCompactionMetrics metrics3 = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics3.getCompleted() == metrics.getCompleted()) { + UtilWaitThread.sleep(250); + metrics3 = ExternalCompactionUtils.getCoordinatorMetrics(); + } + + // Confirm that final state is in the metadata table + assertEquals(1, ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid).count()); + + // Online the table + client.tableOperations().online(table1); + + // wait for compaction to be committed by tserver or test timeout + long finalStateCount = + ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid).count(); + while (finalStateCount > 0) { + UtilWaitThread.sleep(250); + finalStateCount = ExternalCompactionUtils.getFinalStatesForTable(getCluster(), tid).count(); + } + + // Check that the compaction succeeded + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + assertTrue(metrics.getStarted() > startingMetrics.getStarted()); + assertTrue(metrics.getCompleted() > startingMetrics.getCompleted()); + assertEquals(metrics.getFailed(), startingMetrics.getFailed()); + assertEquals(0, metrics.getRunning()); + + ExternalCompactionUtils.stopProcesses(comp, coord); + } + } + + @Test + public void testUserCompactionCancellation() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionMetrics startingMetrics = ExternalCompactionUtils.getCoordinatorMetrics(); + + ExternalCompactionUtils.createTable(client, table1, "cs1"); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", false); + + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + List<TabletMetadata> md = new ArrayList<>(); + TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + + while (md.size() == 0) { + tm.close(); + tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + } + + client.tableOperations().cancelCompaction(table1); + + // wait for failure or test timeout + ExternalCompactionMetrics metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics.getFailed() == 0) { + UtilWaitThread.sleep(250); + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } + + // Confirm that compaction failed + assertTrue(metrics.getStarted() > startingMetrics.getStarted()); + assertEquals(0, metrics.getRunning()); + assertEquals(startingMetrics.getCompleted(), metrics.getCompleted()); + assertTrue(metrics.getFailed() > startingMetrics.getFailed()); + } + } + + @Test + public void testDeleteTableDuringUserExternalCompaction() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionMetrics startingMetrics = ExternalCompactionUtils.getCoordinatorMetrics(); + + ExternalCompactionUtils.createTable(client, table1, "cs1"); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", false); + + List<TabletMetadata> md = new ArrayList<>(); + TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + + while (md.size() == 0) { + tm.close(); + tm = getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + } + + client.tableOperations().delete(table1); + + // wait for failure or test timeout + ExternalCompactionMetrics metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics.getFailed() == startingMetrics.getFailed()) { + UtilWaitThread.sleep(250); + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } + + // Confirm that compaction failed + assertTrue(metrics.getStarted() > startingMetrics.getStarted()); + assertEquals(startingMetrics.getCompleted(), metrics.getCompleted()); + assertTrue(metrics.getFailed() > startingMetrics.getFailed()); + } + } + + @Test + public void testDeleteTableDuringExternalCompaction() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionMetrics startingMetrics = ExternalCompactionUtils.getCoordinatorMetrics(); + + ExternalCompactionUtils.createTable(client, table1, "cs1"); + // set compaction ratio to 1 so that majc occurs naturally, not user compaction + // user compaction blocks delete + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); + // cause multiple rfiles to be created + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + LOG.warn("Tid for Table {} is {}", table1, tid); + List<TabletMetadata> md = new ArrayList<>(); + TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + + while (md.size() == 0) { + tm.close(); + tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + UtilWaitThread.sleep(250); + } + + client.tableOperations().delete(table1); + + // wait for failure or test timeout + ExternalCompactionMetrics metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics.getFailed() == startingMetrics.getFailed()) { + UtilWaitThread.sleep(50); + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } + + tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) + .fetch(ColumnType.ECOMP).build(); + assertEquals(0, tm.stream().count()); + tm.close(); + + // The metadata tablets will be deleted from the metadata table because we have deleted the + // table. Verify that the compaction failed by looking at the metrics in the Coordinator. + assertTrue(metrics.getStarted() > startingMetrics.getStarted()); + assertEquals(0, metrics.getRunning()); + assertEquals(startingMetrics.getCompleted(), metrics.getCompleted()); + assertTrue(metrics.getFailed() > startingMetrics.getFailed()); + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java new file mode 100644 index 0000000..070585c --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.coordinator.ExternalCompactionMetrics; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.clientImpl.Tables; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class ExternalCompaction_3_IT extends AccumuloClusterHarness + implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionUtils.configureMiniCluster(cfg, coreSite); + } + + @Test + public void testMergeDuringExternalCompaction() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionUtils.createTable(client, table1, "cs1", 2); + // set compaction ratio to 1 so that majc occurs naturally, not user compaction + // user compaction blocks merge + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); + // cause multiple rfiles to be created + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + ExternalCompactionUtils.writeData(client, table1); + + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + + ((MiniAccumuloClusterImpl) getCluster()).exec(TestCompactionCoordinator.class); + // Wait for coordinator to start + ExternalCompactionMetrics metrics = null; + while (null == metrics) { + try { + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } catch (Exception e) { + UtilWaitThread.sleep(250); + } + } + + ((MiniAccumuloClusterImpl) getCluster()).exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); + + // Wait for the compaction to start by waiting for 1 external compaction column + Set<ExternalCompactionId> ecids = new HashSet<>(); + do { + UtilWaitThread.sleep(50); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .forEach(ecids::add); + } + } while (ecids.isEmpty()); + + var md = new ArrayList<TabletMetadata>(); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.PREV_ROW).build()) { + tm.forEach(t -> md.add(t)); + assertEquals(2, md.size()); + } + + assertTrue(ExternalCompactionUtils.getCoordinatorMetrics().getFailed() == 0); + + // Merge - blocking operation + Text start = md.get(0).getPrevEndRow(); + Text end = md.get(1).getEndRow(); + client.tableOperations().merge(table1, start, end); + + // wait for failure or test timeout + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics.getFailed() == 0) { + UtilWaitThread.sleep(250); + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } + + // Check that there is one more failed compaction in the coordinator metrics + assertTrue(metrics.getStarted() > 0); + assertEquals(0, metrics.getCompleted()); + assertTrue(metrics.getFailed() > 0); + + // ensure compaction ids were deleted by merge operation from metadata table + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + Set<ExternalCompactionId> ecids2 = tm.stream() + .flatMap(t -> t.getExternalCompactions().keySet().stream()).collect(Collectors.toSet()); + // keep checking until test times out + while (!Collections.disjoint(ecids, ecids2)) { + UtilWaitThread.sleep(25); + ecids2 = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .collect(Collectors.toSet()); + } + } + } + } + + @Test + public void testCoordinatorRestartsDuringCompaction() throws Exception { + String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + ExternalCompactionUtils.createTable(client, table1, "cs1", 2); + ExternalCompactionUtils.writeData(client, table1); + + ProcessInfo coord = ExternalCompactionUtils + .startCoordinator(((MiniAccumuloClusterImpl) getCluster()), CompactionCoordinator.class); + ProcessInfo compactor = ((MiniAccumuloClusterImpl) getCluster()) + .exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); + ExternalCompactionUtils.compact(client, table1, 2, "DCQ1", false); + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + // Wait for the compaction to start by waiting for 1 external compaction column + Set<ExternalCompactionId> ecids = new HashSet<>(); + do { + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .forEach(ecids::add); + UtilWaitThread.sleep(50); + } + } while (ecids.isEmpty()); + + // Stop the Coordinator + ExternalCompactionUtils.stopProcesses(coord); + + // Start the TestCompactionCoordinator so that we have + // access to the metrics. + ProcessInfo testCoordinator = ExternalCompactionUtils.startCoordinator( + ((MiniAccumuloClusterImpl) getCluster()), TestCompactionCoordinator.class); + + // wait for failure or test timeout + ExternalCompactionMetrics metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + while (metrics.getRunning() == 0) { + UtilWaitThread.sleep(250); + metrics = ExternalCompactionUtils.getCoordinatorMetrics(); + } + ExternalCompactionUtils.stopProcesses(testCoordinator, compactor); + + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java similarity index 98% rename from test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java rename to test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 4f99e6d..eb800f9 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; diff --git a/test/src/main/java/org/apache/accumulo/test/NonCommittingExternalCompactionThriftClientHandler.java b/test/src/main/java/org/apache/accumulo/test/compaction/NonCommittingExternalCompactionThriftClientHandler.java similarity index 97% rename from test/src/main/java/org/apache/accumulo/test/NonCommittingExternalCompactionThriftClientHandler.java rename to test/src/main/java/org/apache/accumulo/test/compaction/NonCommittingExternalCompactionThriftClientHandler.java index dabfe78..bbdbfde 100644 --- a/test/src/main/java/org/apache/accumulo/test/NonCommittingExternalCompactionThriftClientHandler.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/NonCommittingExternalCompactionThriftClientHandler.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; diff --git a/test/src/main/java/org/apache/accumulo/test/SizeCompactionStrategy.java b/test/src/main/java/org/apache/accumulo/test/compaction/SizeCompactionStrategy.java similarity index 97% rename from test/src/main/java/org/apache/accumulo/test/SizeCompactionStrategy.java rename to test/src/main/java/org/apache/accumulo/test/compaction/SizeCompactionStrategy.java index bedf818..db08f46 100644 --- a/test/src/main/java/org/apache/accumulo/test/SizeCompactionStrategy.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/SizeCompactionStrategy.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import java.util.Map; import java.util.Map.Entry; diff --git a/test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java similarity index 98% rename from test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java rename to test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java index 9e6c2f2..316b88c 100644 --- a/test/src/main/java/org/apache/accumulo/test/SplitCancelsMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import static org.junit.Assert.assertTrue; diff --git a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinator.java similarity index 99% rename from test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java rename to test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinator.java index 1ed6c65..e36a33e 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import java.io.IOException; import java.net.UnknownHostException; diff --git a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java similarity index 98% rename from test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java rename to test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java index e63ddb0..3f69c05 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; diff --git a/test/src/main/java/org/apache/accumulo/test/TestCompactionStrategy.java b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionStrategy.java similarity index 98% rename from test/src/main/java/org/apache/accumulo/test/TestCompactionStrategy.java rename to test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionStrategy.java index d709d12..3b7c57d 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestCompactionStrategy.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionStrategy.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import java.util.Map; diff --git a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/UserCompactionStrategyIT.java similarity index 99% rename from test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java rename to test/src/main/java/org/apache/accumulo/test/compaction/UserCompactionStrategyIT.java index a4b8b1e..519638d 100644 --- a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/UserCompactionStrategyIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.test; +package org.apache.accumulo.test.compaction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index a489f89..36cf786 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -65,10 +65,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ta import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.CompactionExecutorIT; -import org.apache.accumulo.test.ExternalCompactionIT.FSelector; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.accumulo.test.compaction.CompactionExecutorIT; +import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;