This is an automated email from the ASF dual-hosted git repository. hkeebler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 0c5ddc8 Fix #1043 Support stable ~del split points (#1344) 0c5ddc8 is described below commit 0c5ddc8a4ef449ef546a77986d81127fccef67d2 Author: hkeebler <49656678+hkeeb...@users.noreply.github.com> AuthorDate: Tue Sep 17 10:53:48 2019 -0400 Fix #1043 Support stable ~del split points (#1344) * Fix #1043 Support stable ~del split points includes recommended changes --- .../core/metadata/schema/MetadataSchema.java | 11 ++++- .../accumulo/core/metadata/schema/SortSkew.java | 48 ++++++++++++++++++++ .../core/metadata/schema/DeleteMetadataTest.java | 35 +++++++++++++++ .../core/metadata/schema/SortSkewTest.java | 52 ++++++++++++++++++++++ .../accumulo/server/metadata/ServerAmpleImpl.java | 11 ++--- .../accumulo/server/util/ListVolumesUsed.java | 33 ++++++-------- .../test/functional/GarbageCollectorIT.java | 13 +++--- 7 files changed, 168 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 9d23ae0..b2fdede 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -255,12 +255,19 @@ public class MetadataSchema { private static final Section section = new Section(RESERVED_PREFIX + "del", true, RESERVED_PREFIX + "dem", false); + private static final int encoded_prefix_length = + section.getRowPrefix().length() + SortSkew.SORTSKEW_LENGTH; + public static Range getRange() { return section.getRange(); } - public static String getRowPrefix() { - return section.getRowPrefix(); + public static String encodeRow(String value) { + return section.getRowPrefix() + SortSkew.getCode(value) + value; + } + + public static String decodeRow(String row) { + return row.substring(encoded_prefix_length); } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SortSkew.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SortSkew.java new file mode 100644 index 0000000..665139c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SortSkew.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static com.google.common.hash.Hashing.murmur3_32; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Strings; + +/* + * A subprefix used to remove sort skew from some of the metadata generated entries, for example: file deletes + * prefixed with ~del. NOTE: This is persisted data so any change to this processing should + * consider any existing data. + */ +public class SortSkew { + + // A specified length for the skew code used is necessary to parse the key correctly. + // The Hex code for an integer will always be <= 8 + public static final int SORTSKEW_LENGTH = Integer.BYTES * 2; + + /** + * Creates a left justified hex string for the path hashcode of a deterministic length, therefore + * if necessary it is right padded with zeros + * + * @param keypart + * value to be coded + * @return coded value of keypart + */ + public static String getCode(String keypart) { + int hashCode = murmur3_32().hashString(keypart, UTF_8).asInt(); + return Strings.padStart(Integer.toHexString(hashCode), SORTSKEW_LENGTH, '0'); + } + +} diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/DeleteMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/DeleteMetadataTest.java new file mode 100644 index 0000000..4890ab4 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/DeleteMetadataTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class DeleteMetadataTest { + + @Test + public void encodeRowTest() { + String path = "/dir/testpath"; + assertEquals(path, + MetadataSchema.DeletesSection.decodeRow(MetadataSchema.DeletesSection.encodeRow(path))); + path = "hdfs://localhost:8020/dir/r+/1_table/f$%#"; + assertEquals(path, + MetadataSchema.DeletesSection.decodeRow(MetadataSchema.DeletesSection.encodeRow(path))); + + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SortSkewTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SortSkewTest.java new file mode 100644 index 0000000..d71163d --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SortSkewTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.junit.Test; + +public class SortSkewTest { + private static final String shortpath = "1"; + private static final String longpath = + "/verylongpath/12345679xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxiiiiiiiiiiiiiiiiii/zzzzzzzzzzzzzzzzzzzzz" + + "aaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbccccccccccccccccccccccccccxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyzzzzzzzzzzzzzzzz";; + // these are values previously generated from SortSkew.getCode() for the above + private static final String shortcode = "9416ac93"; + private static final String longcode = "b9ddf266"; + + @Test + public void verifyCodeSize() { + int expectedLength = SortSkew.SORTSKEW_LENGTH; + assertEquals(expectedLength, SortSkew.getCode(shortpath).length()); + assertEquals(expectedLength, SortSkew.getCode(longpath).length()); + } + + @Test + public void verifySame() { + assertEquals(SortSkew.getCode("123"), SortSkew.getCode("123")); + assertNotEquals(SortSkew.getCode("123"), SortSkew.getCode("321")); + } + + @Test + public void verifyStable() { + assertEquals(shortcode, SortSkew.getCode(shortpath)); + assertEquals(longcode, SortSkew.getCode(longpath)); + } + +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index ce5f4f3..df59341 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -134,7 +134,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { try (BatchWriter writer = context.createBatchWriter(level.metaTable())) { for (String path : paths) { - Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + path); + Mutation m = new Mutation(MetadataSchema.DeletesSection.encodeRow(path)); m.putDelete(EMPTY_TEXT, EMPTY_TEXT); writer.addMutation(m); } @@ -157,7 +157,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } else if (level == DataLevel.METADATA || level == DataLevel.USER) { Range range = MetadataSchema.DeletesSection.getRange(); if (continuePoint != null && !continuePoint.isEmpty()) { - String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + continuePoint; + String continueRow = MetadataSchema.DeletesSection.encodeRow(continuePoint); range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()); } @@ -170,8 +170,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } scanner.setRange(range); - return Iterators.transform(scanner.iterator(), entry -> entry.getKey().getRow().toString() - .substring(MetadataSchema.DeletesSection.getRowPrefix().length())); + return Iterators.transform(scanner.iterator(), + entry -> MetadataSchema.DeletesSection.decodeRow(entry.getKey().getRow().toString())); } else { throw new IllegalArgumentException(); @@ -196,7 +196,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { public static Mutation createDeleteMutation(ServerContext context, TableId tableId, String pathToRemove) { Path path = context.getVolumeManager().getFullPath(tableId, pathToRemove); - Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.getRowPrefix() + path)); + Mutation delFlag = + new Mutation(new Text(MetadataSchema.DeletesSection.encodeRow(path.toString()))); delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {})); return delFlag; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java index 5e758a2..1417dd5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.server.util; +import java.util.Iterator; import java.util.Map.Entry; import java.util.TreeSet; @@ -23,8 +24,8 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.security.Authorizations; @@ -78,11 +79,11 @@ public class ListVolumesUsed { } - private static void listTable(String name, ServerContext context) throws Exception { + private static void listTable(Ample.DataLevel level, ServerContext context) throws Exception { - System.out.println("Listing volumes referenced in " + name + " tablets section"); + System.out.println("Listing volumes referenced in " + level + " tablets section"); - Scanner scanner = context.createScanner(name, Authorizations.EMPTY); + Scanner scanner = context.createScanner(level.metaTable(), Authorizations.EMPTY); scanner.setRange(MetadataSchema.TabletsSection.getRange()); scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); @@ -109,33 +110,25 @@ public class ListVolumesUsed { System.out.println("\tVolume : " + volume); } + System.out.println("Listing volumes referenced in " + level + + " deletes section (volume replacement occurrs at deletion time)"); volumes.clear(); - scanner.clearColumns(); - scanner.setRange(MetadataSchema.DeletesSection.getRange()); - - for (Entry<Key,Value> entry : scanner) { - String delPath = entry.getKey().getRow().toString() - .substring(MetadataSchema.DeletesSection.getRowPrefix().length()); - volumes.add(getTableURI(delPath)); + Iterator<String> delPaths = context.getAmple().getGcCandidates(level, ""); + while (delPaths.hasNext()) { + volumes.add(getTableURI(delPaths.next())); } - - System.out.println("Listing volumes referenced in " + name - + " deletes section (volume replacement occurrs at deletion time)"); - for (String volume : volumes) { System.out.println("\tVolume : " + volume); } + System.out.println("Listing volumes referenced in " + level + " current logs"); volumes.clear(); WalStateManager wals = new WalStateManager(context); for (Path path : wals.getAllState().keySet()) { volumes.add(getLogURI(path.toString())); } - - System.out.println("Listing volumes referenced in " + name + " current logs"); - for (String volume : volumes) { System.out.println("\tVolume : " + volume); } @@ -144,9 +137,9 @@ public class ListVolumesUsed { public static void listVolumes(ServerContext context) throws Exception { listZookeeper(context); System.out.println(); - listTable(RootTable.NAME, context); + listTable(Ample.DataLevel.METADATA, context); System.out.println(); - listTable(MetadataTable.NAME, context); + listTable(Ample.DataLevel.USER, context); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index 1856efb..7080cf6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@ -186,7 +186,7 @@ public class GarbageCollectorIT extends ConfigurableMacBase { } private Mutation createDelMutation(String path, String cf, String cq, String val) { - Text row = new Text(MetadataSchema.DeletesSection.getRowPrefix() + path); + Text row = new Text(MetadataSchema.DeletesSection.encodeRow(path)); Mutation delFlag = new Mutation(row); delFlag.put(cf, cq, val); return delFlag; @@ -297,18 +297,15 @@ public class GarbageCollectorIT extends ConfigurableMacBase { return Iterators.size(Arrays.asList(cluster.getFileSystem().globStatus(path)).iterator()); } - public static void addEntries(AccumuloClient client) throws Exception { + private void addEntries(AccumuloClient client) throws Exception { client.securityOperations().grantTablePermission(client.whoami(), MetadataTable.NAME, TablePermission.WRITE); try (BatchWriter bw = client.createBatchWriter(MetadataTable.NAME)) { for (int i = 0; i < 100000; ++i) { final Text emptyText = new Text(""); - Text row = - new Text(String.format("%s/%020d/%s", MetadataSchema.DeletesSection.getRowPrefix(), i, - "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee" - + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj")); - Mutation delFlag = new Mutation(row); - delFlag.put(emptyText, emptyText, new Value(new byte[] {})); + String longpath = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee" + + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj"; + Mutation delFlag = createDelMutation(String.format("/%020d/%s", i, longpath), "", "", ""); bw.addMutation(delFlag); } }