Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/53#discussion_r138724401
  
    --- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
    @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
         }
       }
     
    -  /*
    -   * Appends new writes to the pendingWrites. It is better to keep it in
    -   * our own queue rather than writing it to the HDFS output stream because
    -   * HDFSOutputStream.writeChunk is not lightweight at all.
    +  /**
    +   * Return all pending writes at the time the method is called, or null 
if no writes are pending.
    +   *
    +   * Note that after this method returns, there can be additional pending 
writes,
    +   * added concurrently while the existing pending writes are removed.
    +   */
    +  @Nullable
    +  private Entry[] getPendingWrites() {
    +    synchronized (this) {
    +      if (pendingWrites.isEmpty()) {
    +        return null;
    +      }
    +      Entry[] entriesToSync = new Entry[pendingWrites.size()];
    +      for (int i = 0; i < entriesToSync.length; i++) {
    +        entriesToSync[i] = pendingWrites.remove();
    +      }
    +      return entriesToSync;
    +    }
    +  }
    +
    +  /**
    +   * When multiple threads try to log edits at the same time, they all 
will call (@link #append}
    +   * followed by {@link #sync()}, concurrently. Hence, it can happen that 
multiple {@code append()}
    +   * are followed by a single {@code sync}, or vice versa.
    +   *
    +   * We want to record the time and position of the first {@code append()} 
after a {@code sync()},
    +   * then measure the time after the next {@code sync()}, and log a 
warning if it exceeds a threshold.
    +   * Therefore this is called every time before we write the pending list 
out to the log writer.
    +   *
    +   * See {@link #stopTimerIfNeeded(TransactionLogWriter)}.
    +   *
    +   * @throws IOException if the position of the writer cannot be determined
        */
    -  private void append(Entry e) throws IOException {
    -    pendingWrites.add(e);
    +  private void startTimerIfNeeded(TransactionLogWriter writer, int 
entryCount) throws IOException {
    +    // no sync needed because this is only called within a sync block
    +    if (positionBeforeWrite == -1L) {
    +      positionBeforeWrite = writer.getPosition();
    +      countSinceLastSync = 0;
    +      stopWatch.reset().start();
    +    }
    +    countSinceLastSync += entryCount;
       }
     
    -  // Returns all currently pending writes. New writes
    -  // will accumulate in a new list.
    -  private List<Entry> getPendingWrites() {
    -    synchronized (this) {
    -      List<Entry> save = this.pendingWrites;
    -      this.pendingWrites = new LinkedList<>();
    -      return save;
    +  /**
    +   * Called by a {@code sync()} after flushing to file system. Issues a 
warning if the write(s)+sync
    +   * together exceed a threshold.
    +   *
    +   * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
    +   *
    +   * @throws IOException if the position of the writer cannot be determined
    +   */
    +  private void stopTimerIfNeeded(TransactionLogWriter writer) throws 
IOException {
    +    // this method is only called by a thread if it actually called 
sync(), inside a sync block
    +    if (positionBeforeWrite != -1L) { // actually it should never be -1, 
but just in case
    +      stopWatch.stop();
    +      long elapsed = stopWatch.elapsedMillis();
    +      if (elapsed >= slowAppendThreshold) {
    +        long currentPosition = writer.getPosition();
    +        long bytesWritten = currentPosition - positionBeforeWrite;
    +        LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} 
bytes.",
    +                 getName(), elapsed, countSinceLastSync, 
countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
    +      }
         }
    +    positionBeforeWrite = -1L;
    +    countSinceLastSync = 0;
       }
     
       private void sync() throws IOException {
         // writes out pending entries to the HLog
    -    TransactionLogWriter tmpWriter = null;
         long latestSeq = 0;
         int entryCount = 0;
         synchronized (this) {
           if (closed) {
             return;
    --- End diff --
    
    It would be good if this throws an exception to indicate that the entries 
could not be synced. Silently returning will return success to the clients.


---

Reply via email to