This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0903aad  Added detailed trace logging for FATE #1316 (#1527)
0903aad is described below

commit 0903aad008c5d03f1c3e3640152a9a35646eeb12
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Feb 26 19:38:35 2020 -0500

    Added detailed trace logging for FATE #1316 (#1527)
---
 .../apache/accumulo/core/logging/FateLogger.java   | 137 +++++++++++++++++++++
 .../main/java/org/apache/accumulo/fate/Fate.java   |  10 +-
 server/master/pom.xml                              |   4 +
 .../java/org/apache/accumulo/master/Master.java    |   3 +-
 .../apache/accumulo/master/tableOps/TraceRepo.java |  18 +++
 5 files changed, 169 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
new file mode 100644
index 0000000..f725202
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.core.logging;
+
+import static org.apache.accumulo.fate.FateTxId.formatTid;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.accumulo.fate.ReadOnlyRepo;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.StackOverflowException;
+import org.apache.accumulo.fate.TStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FateLogger {
+  private static final String PREFIX = Logging.PREFIX + "fate.";
+
+  // Logs all mutations to FATEs persistent storage. Enabling this logger 
could help debug
+  // reproducible problems with FATE transactions.
+  private static final Logger storeLog = LoggerFactory.getLogger(PREFIX + 
"store");
+
+  public static <T> TStore<T> wrap(TStore<T> store, Function<Repo<T>,String> 
toLogString) {
+
+    // only logging operations that change the persisted data, not operations 
that only read data
+    return new TStore<T>() {
+
+      @Override
+      public long reserve() {
+        return store.reserve();
+      }
+
+      @Override
+      public void reserve(long tid) {
+        store.reserve(tid);
+      }
+
+      @Override
+      public void unreserve(long tid, long deferTime) {
+        store.unreserve(tid, deferTime);
+      }
+
+      @Override
+      public List<ReadOnlyRepo<T>> getStack(long tid) {
+        return store.getStack(tid);
+      }
+
+      @Override
+      public TStatus getStatus(long tid) {
+        return store.getStatus(tid);
+      }
+
+      @Override
+      public TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected) {
+        return store.waitForStatusChange(tid, expected);
+      }
+
+      @Override
+      public Serializable getProperty(long tid, String prop) {
+        return store.getProperty(tid, prop);
+      }
+
+      @Override
+      public List<Long> list() {
+        return store.list();
+      }
+
+      @Override
+      public long create() {
+        long tid = store.create();
+        if (storeLog.isTraceEnabled())
+          storeLog.trace("created {}", formatTid(tid));
+        return tid;
+      }
+
+      @Override
+      public Repo<T> top(long tid) {
+        return store.top(tid);
+      }
+
+      @Override
+      public void push(long tid, Repo<T> repo) throws StackOverflowException {
+        store.push(tid, repo);
+        if (storeLog.isTraceEnabled())
+          storeLog.trace("pushed {} {}", formatTid(tid), 
toLogString.apply(repo));
+      }
+
+      @Override
+      public void pop(long tid) {
+        store.pop(tid);
+        if (storeLog.isTraceEnabled())
+          storeLog.trace("popped {}", formatTid(tid));
+      }
+
+      @Override
+      public void setStatus(long tid, TStatus status) {
+        store.setStatus(tid, status);
+        if (storeLog.isTraceEnabled())
+          storeLog.trace("setStatus {} {}", formatTid(tid), status);
+      }
+
+      @Override
+      public void setProperty(long tid, String prop, Serializable val) {
+        store.setProperty(tid, prop, val);
+        if (storeLog.isTraceEnabled())
+          storeLog.trace("setProperty {} {} {}", formatTid(tid), prop, val);
+      }
+
+      @Override
+      public void delete(long tid) {
+        store.delete(tid);
+        if (storeLog.isTraceEnabled())
+          storeLog.trace("deleted {}", formatTid(tid));
+      }
+    };
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index 1e42ee3..3e4601d 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -25,7 +25,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 
+import org.apache.accumulo.core.logging.FateLogger;
 import org.apache.accumulo.core.util.ShutdownUtil;
 import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;
 import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -211,9 +213,12 @@ public class Fate<T> {
    * <p>
    * Note: Users of this class should call {@link 
#startTransactionRunners(int)} to launch the
    * worker threads after creating a Fate object.
+   *
+   * @param toLogStrFunc
+   *          A function that converts Repo to Strings that are suitable for 
logging
    */
-  public Fate(T environment, TStore<T> store) {
-    this.store = store;
+  public Fate(T environment, TStore<T> store, Function<Repo<T>,String> 
toLogStrFunc) {
+    this.store = FateLogger.wrap(store, toLogStrFunc);
     this.environment = environment;
   }
 
@@ -251,6 +256,7 @@ public class Fate<T> {
             // this should not happen
             throw new RuntimeException(e);
           }
+
         }
 
         if (autoCleanUp)
diff --git a/server/master/pom.xml b/server/master/pom.xml
index b65f4bf..14eaa88 100644
--- a/server/master/pom.xml
+++ b/server/master/pom.xml
@@ -41,6 +41,10 @@
       <optional>true</optional>
     </dependency>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index ace9356..0600357 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -93,6 +93,7 @@ import 
org.apache.accumulo.master.replication.MasterReplicationCoordinator;
 import org.apache.accumulo.master.replication.ReplicationDriver;
 import org.apache.accumulo.master.replication.WorkDriver;
 import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.master.tableOps.TraceRepo;
 import org.apache.accumulo.master.upgrade.UpgradeCoordinator;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.HighlyAvailableService;
@@ -1117,7 +1118,7 @@ public class Master extends AbstractServer
 
       int threads = 
getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
 
-      fate = new Fate<>(this, store);
+      fate = new Fate<>(this, store, TraceRepo::toLogString);
       fate.startTransactionRunners(threads);
 
       SimpleTimer.getInstance(getConfiguration()).schedule(() -> 
store.ageOff(), 63000, 63000);
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
index 5cbd562..21042f6 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java
@@ -21,8 +21,11 @@ package org.apache.accumulo.master.tableOps;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
 import org.apache.htrace.TraceScope;
 
+import com.google.gson.Gson;
+
 public class TraceRepo<T> implements Repo<T> {
 
   private static final long serialVersionUID = 1L;
@@ -72,4 +75,19 @@ public class TraceRepo<T> implements Repo<T> {
     return repo.getReturn();
   }
 
+  /**
+   * @return string version of Repo that is suitable for logging
+   */
+  public static String toLogString(Repo<Master> repo) {
+    if (repo instanceof TraceRepo) {
+      // There are two reasons the repo is unwrapped. First I could not figure 
out how to get this
+      // to work with Gson. Gson kept serializing nothing for the generic 
pointer TraceRepo.repo.
+      // Second I thought this information was not useful for logging.
+      repo = ((TraceRepo<Master>) repo).repo;
+    }
+
+    // Inorder for Gson to work with generic types, the following passes 
repo.getClass() to Gson.
+    // See the Gson javadoc for more info.
+    return repo.getClass() + " " + new Gson().toJson(repo, repo.getClass());
+  }
 }

Reply via email to