This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 52e80b1 Create FlushNoFileIT and fix NPE in tablet file flush code (#2342) 52e80b1 is described below commit 52e80b1d11f0093a07a2d886d85aa74f7998096b Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue Nov 9 16:55:06 2021 -0500 Create FlushNoFileIT and fix NPE in tablet file flush code (#2342) * Make flush methods return Optional<StoredTabletFile> to handle case when no file is written * Clean up a bunch of comments in flush code * Create FlushNoFileIT for testing flush that doesn't write out a file * Add checkFlushId to FunctionalTestUtils * Update TabletLogger.flushed() for when no file is written Co-authored-by: Keith Turner <ktur...@apache.org> --- .../apache/accumulo/core/logging/TabletLogger.java | 9 +- .../accumulo/server/util/ManagerMetadataUtil.java | 21 ++- .../accumulo/tserver/tablet/DatafileManager.java | 36 +++--- .../org/apache/accumulo/tserver/tablet/Tablet.java | 10 +- .../accumulo/test/functional/FlushNoFileIT.java | 142 +++++++++++++++++++++ .../test/functional/FunctionalTestUtils.java | 35 +++++ 6 files changed, 220 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index ab8808a..7ab5341 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@ -22,11 +22,13 @@ import static java.util.stream.Collectors.toList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -143,8 +145,11 @@ public class TabletLogger { asFileNames(job.getFiles())); } - public static void flushed(KeyExtent extent, TabletFile newDatafile) { - fileLog.debug("Flushed {} created {} from [memory]", extent, newDatafile); + public static void flushed(KeyExtent extent, Optional<StoredTabletFile> newDatafile) { + if (newDatafile.isPresent()) + fileLog.debug("Flushed {} created {} from [memory]", extent, newDatafile.get()); + else + fileLog.debug("Flushed {} from [memory] but no file was written.", extent); } public static void bulkImported(KeyExtent extent, TabletFile file) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java index 9c557c6..3728377 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java @@ -215,23 +215,22 @@ public class ManagerMetadataUtil { } /** - * new data file update function adds one data file to a tablet's list - * - * @param path - * should be relative to the table directory - * + * Update tablet file data from flush. Returns a StoredTabletFile if there are data entries. */ - public static StoredTabletFile updateTabletDataFile(ServerContext context, KeyExtent extent, - TabletFile path, DataFileValue dfv, MetadataTime time, String address, ServiceLock zooLock, - Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) { + public static Optional<StoredTabletFile> updateTabletDataFile(ServerContext context, + KeyExtent extent, TabletFile newDatafile, DataFileValue dfv, MetadataTime time, + String address, ServiceLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, + long flushId) { TabletMutator tablet = context.getAmple().mutateTablet(extent); - StoredTabletFile newFile = null; + // if there are no entries, the path doesn't get stored in metadata table, only the flush ID + Optional<StoredTabletFile> newFile = Optional.empty(); + // if entries are present, write to path to metadata table if (dfv.getNumEntries() > 0) { - tablet.putFile(path, dfv); + tablet.putFile(newDatafile, dfv); tablet.putTime(time); - newFile = path.insert(); + newFile = Optional.of(newDatafile.insert()); TServerInstance self = getTServerInstance(address, zooLock); tablet.putLocation(self, LocationType.LAST); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index d17fcbc..8d9c33d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -274,9 +274,14 @@ class DatafileManager { return newFiles.keySet(); } - StoredTabletFile bringMinorCompactionOnline(TabletFile tmpDatafile, TabletFile newDatafile, - DataFileValue dfv, CommitSession commitSession, long flushId) { - StoredTabletFile newFile; + /** + * Returns Optional of the new file created. It is possible that the file was just flushed with no + * entries so was not inserted into the metadata. In this case empty is returned. If the file was + * stored in the metadata table, then StoredTableFile will be returned. + */ + Optional<StoredTabletFile> bringMinorCompactionOnline(TabletFile tmpDatafile, + TabletFile newDatafile, DataFileValue dfv, CommitSession commitSession, long flushId) { + Optional<StoredTabletFile> newFile; // rename before putting in metadata table, so files in metadata table should // always exist boolean attemptedRename = false; @@ -284,6 +289,7 @@ class DatafileManager { do { try { if (dfv.getNumEntries() == 0) { + log.debug("No data entries so delete temporary file {}", tmpDatafile); vm.deleteRecursively(tmpDatafile.getPath()); } else { if (!attemptedRename && vm.exists(newDatafile.getPath())) { @@ -328,13 +334,9 @@ class DatafileManager { } try { // the order of writing to metadata and walog is important in the face of machine/process - // failures - // need to write to metadata before writing to walog, when things are done in the reverse - // order - // data could be lost... the minor compaction start even should be written before the - // following metadata - // write is made - + // failures need to write to metadata before writing to walog, when things are done in the + // reverse order data could be lost... the minor compaction start even should be written + // before the following metadata write is made newFile = tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, dfv, unusedWalLogs, flushId); @@ -362,8 +364,7 @@ class DatafileManager { do { try { // the purpose of making this update use the new commit session, instead of the old one - // passed in, - // is because the new one will reference the logs used by current memory... + // passed in, is because the new one will reference the logs used by current memory... tablet.getTabletServer().minorCompactionFinished( tablet.getTabletMemory().getCommitSession(), commitSession.getWALogSeq() + 2); @@ -377,11 +378,12 @@ class DatafileManager { synchronized (tablet) { t1 = System.currentTimeMillis(); - if (dfv.getNumEntries() > 0) { - if (datafileSizes.containsKey(newFile)) { - log.error("Adding file that is already in set {}", newFile); + if (dfv.getNumEntries() > 0 && newFile.isPresent()) { + StoredTabletFile newFileStored = newFile.get(); + if (datafileSizes.containsKey(newFileStored)) { + log.error("Adding file that is already in set {}", newFileStored); } - datafileSizes.put(newFile, dfv); + datafileSizes.put(newFileStored, dfv); } tablet.flushComplete(flushId); @@ -389,7 +391,7 @@ class DatafileManager { t2 = System.currentTimeMillis(); } - TabletLogger.flushed(tablet.getExtent(), newDatafile); + TabletLogger.flushed(tablet.getExtent(), newFile); if (log.isTraceEnabled()) { log.trace(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index da24549..c4597ec 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -819,7 +820,7 @@ public class Tablet { var storedFile = getDatafileManager().bringMinorCompactionOnline(tmpDatafile, newDatafile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession, flushId); - compactable.filesAdded(true, List.of(storedFile)); + storedFile.ifPresent(stf -> compactable.filesAdded(true, List.of(stf))); } catch (Exception e) { TraceUtil.setException(span2, e, true); throw e; @@ -2204,8 +2205,11 @@ public class Tablet { } - public StoredTabletFile updateTabletDataFile(long maxCommittedTime, TabletFile newDatafile, - DataFileValue dfv, Set<String> unusedWalLogs, long flushId) { + /** + * Update tablet file data from flush. Returns a StoredTabletFile if there are data entries. + */ + public Optional<StoredTabletFile> updateTabletDataFile(long maxCommittedTime, + TabletFile newDatafile, DataFileValue dfv, Set<String> unusedWalLogs, long flushId) { synchronized (timeLock) { if (maxCommittedTime > persistedTime) { persistedTime = maxCommittedTime; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FlushNoFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FlushNoFileIT.java new file mode 100644 index 0000000..855a2a5 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/FlushNoFileIT.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * Tests that Accumulo will flush but not create a file that has 0 entries. + */ +public class FlushNoFileIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void test() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String tableName = getUniqueNames(1)[0]; + + NewTableConfiguration ntc = new NewTableConfiguration(); + IteratorSetting iteratorSetting = new IteratorSetting(20, NullIterator.class); + ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc)); + ntc.withSplits(new TreeSet<>(Set.of(new Text("a"), new Text("s")))); + + c.tableOperations().create(tableName, ntc); + TableId tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + try (BatchWriter bw = c.createBatchWriter(tableName)) { + Mutation m = new Mutation(new Text("r1")); + m.put("acf", tableName, "1"); + bw.addMutation(m); + } + + FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0); + + c.tableOperations().flush(tableName, null, null, true); + + FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0); + + long flushId = FunctionalTestUtils.checkFlushId((ClientContext) c, tableId, 0); + + try (BatchWriter bw = c.createBatchWriter(tableName)) { + Mutation m = new Mutation(new Text("r2")); + m.put("acf", tableName, "1"); + bw.addMutation(m); + } + + c.tableOperations().flush(tableName, null, null, true); + + FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0); + + long secondFlushId = FunctionalTestUtils.checkFlushId((ClientContext) c, tableId, flushId); + assertTrue("Flush ID did not change", secondFlushId > flushId); + + try (Scanner scanner = c.createScanner(tableName)) { + assertEquals("Expected 0 Entries in table", 0, Iterables.size(scanner)); + } + } + } + + public static class NullIterator implements SortedKeyValueIterator<Key,Value> { + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) {} + + @Override + public boolean hasTop() { + return false; + } + + @Override + public void next() throws IOException {} + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) {} + + @Override + public Key getTopKey() { + return null; + } + + @Override + public Value getTopValue() { + return null; + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + return null; + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 6113c01..e73ee1e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -35,6 +36,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map.Entry; +import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -58,6 +60,8 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.fate.AdminUtil; import org.apache.accumulo.fate.AdminUtil.FateStatus; @@ -218,4 +222,35 @@ public class FunctionalTestUtils { throw new RuntimeException(e); } } + + /** + * Verify that flush ID gets updated properly and is the same for all tablets. + */ + static long checkFlushId(ClientContext c, TableId tableId, long prevFlushID) throws Exception { + try (TabletsMetadata metaScan = + c.getAmple().readTablets().forTable(tableId).fetch(FLUSH_ID).checkConsistency().build()) { + + long flushId = 0, prevTabletFlushId = 0; + for (TabletMetadata tabletMetadata : metaScan) { + OptionalLong optFlushId = tabletMetadata.getFlushId(); + if (optFlushId.isPresent()) { + flushId = optFlushId.getAsLong(); + if (prevTabletFlushId > 0 && prevTabletFlushId != flushId) { + throw new Exception("Flush ID different between tablets"); + } else { + prevTabletFlushId = flushId; + } + } else { + throw new Exception("Missing flush ID"); + } + } + + if (prevFlushID >= flushId) { + throw new Exception( + "Flush ID did not increase. prevFlushID: " + prevFlushID + " current: " + flushId); + } + + return flushId; + } + } }