Github user poornachandra commented on a diff in the pull request:
https://github.com/apache/incubator-tephra/pull/53#discussion_r138724401
--- Diff:
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
---
@@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
}
}
- /*
- * Appends new writes to the pendingWrites. It is better to keep it in
- * our own queue rather than writing it to the HDFS output stream because
- * HDFSOutputStream.writeChunk is not lightweight at all.
+ /**
+ * Return all pending writes at the time the method is called, or null
if no writes are pending.
+ *
+ * Note that after this method returns, there can be additional pending
writes,
+ * added concurrently while the existing pending writes are removed.
+ */
+ @Nullable
+ private Entry[] getPendingWrites() {
+ synchronized (this) {
+ if (pendingWrites.isEmpty()) {
+ return null;
+ }
+ Entry[] entriesToSync = new Entry[pendingWrites.size()];
+ for (int i = 0; i < entriesToSync.length; i++) {
+ entriesToSync[i] = pendingWrites.remove();
+ }
+ return entriesToSync;
+ }
+ }
+
+ /**
+ * When multiple threads try to log edits at the same time, they all
will call (@link #append}
+ * followed by {@link #sync()}, concurrently. Hence, it can happen that
multiple {@code append()}
+ * are followed by a single {@code sync}, or vice versa.
+ *
+ * We want to record the time and position of the first {@code append()}
after a {@code sync()},
+ * then measure the time after the next {@code sync()}, and log a
warning if it exceeds a threshold.
+ * Therefore this is called every time before we write the pending list
out to the log writer.
+ *
+ * See {@link #stopTimerIfNeeded(TransactionLogWriter)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
*/
- private void append(Entry e) throws IOException {
- pendingWrites.add(e);
+ private void startTimerIfNeeded(TransactionLogWriter writer, int
entryCount) throws IOException {
+ // no sync needed because this is only called within a sync block
+ if (positionBeforeWrite == -1L) {
+ positionBeforeWrite = writer.getPosition();
+ countSinceLastSync = 0;
+ stopWatch.reset().start();
+ }
+ countSinceLastSync += entryCount;
}
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- private List<Entry> getPendingWrites() {
- synchronized (this) {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<>();
- return save;
+ /**
+ * Called by a {@code sync()} after flushing to file system. Issues a
warning if the write(s)+sync
+ * together exceed a threshold.
+ *
+ * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
+ */
+ private void stopTimerIfNeeded(TransactionLogWriter writer) throws
IOException {
+ // this method is only called by a thread if it actually called
sync(), inside a sync block
+ if (positionBeforeWrite != -1L) { // actually it should never be -1,
but just in case
+ stopWatch.stop();
+ long elapsed = stopWatch.elapsedMillis();
+ if (elapsed >= slowAppendThreshold) {
+ long currentPosition = writer.getPosition();
+ long bytesWritten = currentPosition - positionBeforeWrite;
+ LOG.info("Slow append to log {}, took {} ms for {} entr{} and {}
bytes.",
+ getName(), elapsed, countSinceLastSync,
countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
+ }
}
+ positionBeforeWrite = -1L;
+ countSinceLastSync = 0;
}
private void sync() throws IOException {
// writes out pending entries to the HLog
- TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
int entryCount = 0;
synchronized (this) {
if (closed) {
return;
--- End diff --
It would be good if this throws an exception to indicate that the entries
could not be synced. Silently returning will return success to the clients.
---