This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 52e80b1  Create FlushNoFileIT and fix NPE in tablet file flush code 
(#2342)
52e80b1 is described below

commit 52e80b1d11f0093a07a2d886d85aa74f7998096b
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Tue Nov 9 16:55:06 2021 -0500

    Create FlushNoFileIT and fix NPE in tablet file flush code (#2342)
    
    * Make flush methods return Optional<StoredTabletFile> to handle case
    when no file is written
    * Clean up a bunch of comments in flush code
    * Create FlushNoFileIT for testing flush that doesn't write out a file
    * Add checkFlushId to FunctionalTestUtils
    * Update TabletLogger.flushed() for when no file is written
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../apache/accumulo/core/logging/TabletLogger.java |   9 +-
 .../accumulo/server/util/ManagerMetadataUtil.java  |  21 ++-
 .../accumulo/tserver/tablet/DatafileManager.java   |  36 +++---
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  10 +-
 .../accumulo/test/functional/FlushNoFileIT.java    | 142 +++++++++++++++++++++
 .../test/functional/FunctionalTestUtils.java       |  35 +++++
 6 files changed, 220 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index ab8808a..7ab5341 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@ -22,11 +22,13 @@ import static java.util.stream.Collectors.toList;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -143,8 +145,11 @@ public class TabletLogger {
         asFileNames(job.getFiles()));
   }
 
-  public static void flushed(KeyExtent extent, TabletFile newDatafile) {
-    fileLog.debug("Flushed {} created {} from [memory]", extent, newDatafile);
+  public static void flushed(KeyExtent extent, Optional<StoredTabletFile> 
newDatafile) {
+    if (newDatafile.isPresent())
+      fileLog.debug("Flushed {} created {} from [memory]", extent, 
newDatafile.get());
+    else
+      fileLog.debug("Flushed {} from [memory] but no file was written.", 
extent);
   }
 
   public static void bulkImported(KeyExtent extent, TabletFile file) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
index 9c557c6..3728377 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
@@ -215,23 +215,22 @@ public class ManagerMetadataUtil {
   }
 
   /**
-   * new data file update function adds one data file to a tablet's list
-   *
-   * @param path
-   *          should be relative to the table directory
-   *
+   * Update tablet file data from flush. Returns a StoredTabletFile if there 
are data entries.
    */
-  public static StoredTabletFile updateTabletDataFile(ServerContext context, 
KeyExtent extent,
-      TabletFile path, DataFileValue dfv, MetadataTime time, String address, 
ServiceLock zooLock,
-      Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
+  public static Optional<StoredTabletFile> updateTabletDataFile(ServerContext 
context,
+      KeyExtent extent, TabletFile newDatafile, DataFileValue dfv, 
MetadataTime time,
+      String address, ServiceLock zooLock, Set<String> unusedWalLogs, 
TServerInstance lastLocation,
+      long flushId) {
 
     TabletMutator tablet = context.getAmple().mutateTablet(extent);
-    StoredTabletFile newFile = null;
+    // if there are no entries, the path doesn't get stored in metadata table, 
only the flush ID
+    Optional<StoredTabletFile> newFile = Optional.empty();
 
+    // if entries are present, write to path to metadata table
     if (dfv.getNumEntries() > 0) {
-      tablet.putFile(path, dfv);
+      tablet.putFile(newDatafile, dfv);
       tablet.putTime(time);
-      newFile = path.insert();
+      newFile = Optional.of(newDatafile.insert());
 
       TServerInstance self = getTServerInstance(address, zooLock);
       tablet.putLocation(self, LocationType.LAST);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index d17fcbc..8d9c33d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -274,9 +274,14 @@ class DatafileManager {
     return newFiles.keySet();
   }
 
-  StoredTabletFile bringMinorCompactionOnline(TabletFile tmpDatafile, 
TabletFile newDatafile,
-      DataFileValue dfv, CommitSession commitSession, long flushId) {
-    StoredTabletFile newFile;
+  /**
+   * Returns Optional of the new file created. It is possible that the file 
was just flushed with no
+   * entries so was not inserted into the metadata. In this case empty is 
returned. If the file was
+   * stored in the metadata table, then StoredTableFile will be returned.
+   */
+  Optional<StoredTabletFile> bringMinorCompactionOnline(TabletFile tmpDatafile,
+      TabletFile newDatafile, DataFileValue dfv, CommitSession commitSession, 
long flushId) {
+    Optional<StoredTabletFile> newFile;
     // rename before putting in metadata table, so files in metadata table 
should
     // always exist
     boolean attemptedRename = false;
@@ -284,6 +289,7 @@ class DatafileManager {
     do {
       try {
         if (dfv.getNumEntries() == 0) {
+          log.debug("No data entries so delete temporary file {}", 
tmpDatafile);
           vm.deleteRecursively(tmpDatafile.getPath());
         } else {
           if (!attemptedRename && vm.exists(newDatafile.getPath())) {
@@ -328,13 +334,9 @@ class DatafileManager {
     }
     try {
       // the order of writing to metadata and walog is important in the face 
of machine/process
-      // failures
-      // need to write to metadata before writing to walog, when things are 
done in the reverse
-      // order
-      // data could be lost... the minor compaction start even should be 
written before the
-      // following metadata
-      // write is made
-
+      // failures need to write to metadata before writing to walog, when 
things are done in the
+      // reverse order data could be lost... the minor compaction start even 
should be written
+      // before the following metadata write is made
       newFile = 
tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, 
dfv,
           unusedWalLogs, flushId);
 
@@ -362,8 +364,7 @@ class DatafileManager {
     do {
       try {
         // the purpose of making this update use the new commit session, 
instead of the old one
-        // passed in,
-        // is because the new one will reference the logs used by current 
memory...
+        // passed in, is because the new one will reference the logs used by 
current memory...
 
         tablet.getTabletServer().minorCompactionFinished(
             tablet.getTabletMemory().getCommitSession(), 
commitSession.getWALogSeq() + 2);
@@ -377,11 +378,12 @@ class DatafileManager {
     synchronized (tablet) {
       t1 = System.currentTimeMillis();
 
-      if (dfv.getNumEntries() > 0) {
-        if (datafileSizes.containsKey(newFile)) {
-          log.error("Adding file that is already in set {}", newFile);
+      if (dfv.getNumEntries() > 0 && newFile.isPresent()) {
+        StoredTabletFile newFileStored = newFile.get();
+        if (datafileSizes.containsKey(newFileStored)) {
+          log.error("Adding file that is already in set {}", newFileStored);
         }
-        datafileSizes.put(newFile, dfv);
+        datafileSizes.put(newFileStored, dfv);
       }
 
       tablet.flushComplete(flushId);
@@ -389,7 +391,7 @@ class DatafileManager {
       t2 = System.currentTimeMillis();
     }
 
-    TabletLogger.flushed(tablet.getExtent(), newDatafile);
+    TabletLogger.flushed(tablet.getExtent(), newFile);
 
     if (log.isTraceEnabled()) {
       log.trace(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 
1000.0,
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index da24549..c4597ec 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -819,7 +820,7 @@ public class Tablet {
         var storedFile = 
getDatafileManager().bringMinorCompactionOnline(tmpDatafile, newDatafile,
             new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), 
commitSession,
             flushId);
-        compactable.filesAdded(true, List.of(storedFile));
+        storedFile.ifPresent(stf -> compactable.filesAdded(true, 
List.of(stf)));
       } catch (Exception e) {
         TraceUtil.setException(span2, e, true);
         throw e;
@@ -2204,8 +2205,11 @@ public class Tablet {
 
   }
 
-  public StoredTabletFile updateTabletDataFile(long maxCommittedTime, 
TabletFile newDatafile,
-      DataFileValue dfv, Set<String> unusedWalLogs, long flushId) {
+  /**
+   * Update tablet file data from flush. Returns a StoredTabletFile if there 
are data entries.
+   */
+  public Optional<StoredTabletFile> updateTabletDataFile(long maxCommittedTime,
+      TabletFile newDatafile, DataFileValue dfv, Set<String> unusedWalLogs, 
long flushId) {
     synchronized (timeLock) {
       if (maxCommittedTime > persistedTime) {
         persistedTime = maxCommittedTime;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FlushNoFileIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/FlushNoFileIT.java
new file mode 100644
index 0000000..855a2a5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FlushNoFileIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests that Accumulo will flush but not create a file that has 0 entries.
+ */
+public class FlushNoFileIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      IteratorSetting iteratorSetting = new IteratorSetting(20, 
NullIterator.class);
+      ntc.attachIterator(iteratorSetting, 
EnumSet.of(IteratorUtil.IteratorScope.minc));
+      ntc.withSplits(new TreeSet<>(Set.of(new Text("a"), new Text("s"))));
+
+      c.tableOperations().create(tableName, ntc);
+      TableId tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName)) {
+        Mutation m = new Mutation(new Text("r1"));
+        m.put("acf", tableName, "1");
+        bw.addMutation(m);
+      }
+
+      FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0);
+
+      c.tableOperations().flush(tableName, null, null, true);
+
+      FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0);
+
+      long flushId = FunctionalTestUtils.checkFlushId((ClientContext) c, 
tableId, 0);
+
+      try (BatchWriter bw = c.createBatchWriter(tableName)) {
+        Mutation m = new Mutation(new Text("r2"));
+        m.put("acf", tableName, "1");
+        bw.addMutation(m);
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+
+      FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0);
+
+      long secondFlushId = FunctionalTestUtils.checkFlushId((ClientContext) c, 
tableId, flushId);
+      assertTrue("Flush ID did not change", secondFlushId > flushId);
+
+      try (Scanner scanner = c.createScanner(tableName)) {
+        assertEquals("Expected 0 Entries in table", 0, 
Iterables.size(scanner));
+      }
+    }
+  }
+
+  public static class NullIterator implements 
SortedKeyValueIterator<Key,Value> {
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) {}
+
+    @Override
+    public boolean hasTop() {
+      return false;
+    }
+
+    @Override
+    public void next() throws IOException {}
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) {}
+
+    @Override
+    public Key getTopKey() {
+      return null;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return null;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) 
{
+      return null;
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 6113c01..e73ee1e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test.functional;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -35,6 +36,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -58,6 +60,8 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.fate.AdminUtil;
 import org.apache.accumulo.fate.AdminUtil.FateStatus;
@@ -218,4 +222,35 @@ public class FunctionalTestUtils {
       throw new RuntimeException(e);
     }
   }
+
+  /**
+   * Verify that flush ID gets updated properly and is the same for all 
tablets.
+   */
+  static long checkFlushId(ClientContext c, TableId tableId, long prevFlushID) 
throws Exception {
+    try (TabletsMetadata metaScan =
+        
c.getAmple().readTablets().forTable(tableId).fetch(FLUSH_ID).checkConsistency().build())
 {
+
+      long flushId = 0, prevTabletFlushId = 0;
+      for (TabletMetadata tabletMetadata : metaScan) {
+        OptionalLong optFlushId = tabletMetadata.getFlushId();
+        if (optFlushId.isPresent()) {
+          flushId = optFlushId.getAsLong();
+          if (prevTabletFlushId > 0 && prevTabletFlushId != flushId) {
+            throw new Exception("Flush ID different between tablets");
+          } else {
+            prevTabletFlushId = flushId;
+          }
+        } else {
+          throw new Exception("Missing flush ID");
+        }
+      }
+
+      if (prevFlushID >= flushId) {
+        throw new Exception(
+            "Flush ID did not increase. prevFlushID: " + prevFlushID + " 
current: " + flushId);
+      }
+
+      return flushId;
+    }
+  }
 }

Reply via email to