[
https://issues.apache.org/jira/browse/TEPHRA-243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165203#comment-16165203
]
ASF GitHub Bot commented on TEPHRA-243:
---------------------------------------
Github user poornachandra commented on a diff in the pull request:
https://github.com/apache/incubator-tephra/pull/53#discussion_r138720628
--- Diff:
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
---
@@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
}
}
- /*
- * Appends new writes to the pendingWrites. It is better to keep it in
- * our own queue rather than writing it to the HDFS output stream because
- * HDFSOutputStream.writeChunk is not lightweight at all.
+ /**
+ * Return all pending writes at the time the method is called, or null
if no writes are pending.
+ *
+ * Note that after this method returns, there can be additional pending
writes,
+ * added concurrently while the existing pending writes are removed.
+ */
+ @Nullable
+ private Entry[] getPendingWrites() {
+ synchronized (this) {
+ if (pendingWrites.isEmpty()) {
+ return null;
+ }
+ Entry[] entriesToSync = new Entry[pendingWrites.size()];
+ for (int i = 0; i < entriesToSync.length; i++) {
+ entriesToSync[i] = pendingWrites.remove();
+ }
+ return entriesToSync;
+ }
+ }
+
+ /**
+ * When multiple threads try to log edits at the same time, they all
will call (@link #append}
+ * followed by {@link #sync()}, concurrently. Hence, it can happen that
multiple {@code append()}
+ * are followed by a single {@code sync}, or vice versa.
+ *
+ * We want to record the time and position of the first {@code append()}
after a {@code sync()},
+ * then measure the time after the next {@code sync()}, and log a
warning if it exceeds a threshold.
+ * Therefore this is called every time before we write the pending list
out to the log writer.
+ *
+ * See {@link #stopTimerIfNeeded(TransactionLogWriter)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
*/
- private void append(Entry e) throws IOException {
- pendingWrites.add(e);
+ private void startTimerIfNeeded(TransactionLogWriter writer, int
entryCount) throws IOException {
+ // no sync needed because this is only called within a sync block
+ if (positionBeforeWrite == -1L) {
+ positionBeforeWrite = writer.getPosition();
+ countSinceLastSync = 0;
+ stopWatch.reset().start();
+ }
+ countSinceLastSync += entryCount;
}
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- private List<Entry> getPendingWrites() {
- synchronized (this) {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<>();
- return save;
+ /**
+ * Called by a {@code sync()} after flushing to file system. Issues a
warning if the write(s)+sync
+ * together exceed a threshold.
+ *
+ * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
+ */
+ private void stopTimerIfNeeded(TransactionLogWriter writer) throws
IOException {
+ // this method is only called by a thread if it actually called
sync(), inside a sync block
+ if (positionBeforeWrite != -1L) { // actually it should never be -1,
but just in case
+ stopWatch.stop();
+ long elapsed = stopWatch.elapsedMillis();
+ if (elapsed >= slowAppendThreshold) {
+ long currentPosition = writer.getPosition();
+ long bytesWritten = currentPosition - positionBeforeWrite;
+ LOG.info("Slow append to log {}, took {} ms for {} entr{} and {}
bytes.",
+ getName(), elapsed, countSinceLastSync,
countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
+ }
}
+ positionBeforeWrite = -1L;
+ countSinceLastSync = 0;
}
private void sync() throws IOException {
// writes out pending entries to the HLog
- TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
int entryCount = 0;
synchronized (this) {
if (closed) {
return;
}
- // prevent writer being dereferenced
- tmpWriter = writer;
-
- List<Entry> currentPending = getPendingWrites();
- if (!currentPending.isEmpty()) {
- tmpWriter.commitMarker(currentPending.size());
- }
-
- // write out all accumulated entries to log.
- for (Entry e : currentPending) {
- tmpWriter.append(e);
- entryCount++;
- latestSeq = Math.max(latestSeq, e.getKey().get());
+ Entry[] currentPending = getPendingWrites();
+ if (currentPending != null) {
+ entryCount = currentPending.length;
+ startTimerIfNeeded(writer, entryCount);
+ writer.commitMarker(entryCount);
+ for (Entry e : currentPending) {
+ writer.append(e);
+ }
+ // sequence are guaranteed to be ascending, so the last one is the
greatest
+ latestSeq = currentPending[currentPending.length -
1].getKey().get();
+ writtenUpTo = latestSeq;
}
}
- long lastSynced = syncedUpTo.get();
+ // giving up the sync lock here allows other threads to write their
edits before the sync happens.
+ // hence, we can have the edits from n threads in one sync.
+
// someone else might have already synced our edits, avoid double
syncing
- if (lastSynced < latestSeq) {
- tmpWriter.sync();
- metricsCollector.histogram("wal.sync.size", entryCount);
- syncedUpTo.compareAndSet(lastSynced, latestSeq);
+ if (syncedUpTo < latestSeq) {
+ synchronized (this) {
+ // someone else might have synced our edits while we were waiting
+ if (syncedUpTo < latestSeq) {
+ writer.sync();
+ syncedUpTo = writtenUpTo;
+ stopTimerIfNeeded(writer);
+ }
+ }
}
+ // in any case, emit metrics for the number entries we wrote.
+ // because the thread that actually syncs does not know how many it
synced (it not write all of them)
+ metricsCollector.histogram("wal.sync.size", entryCount);
--- End diff --
Can we then emit metrics for sync bytes then?
> When transaction log sync is slow, the warning message should contain more
> information
> --------------------------------------------------------------------------------------
>
> Key: TEPHRA-243
> URL: https://issues.apache.org/jira/browse/TEPHRA-243
> Project: Tephra
> Issue Type: Improvement
> Affects Versions: 0.12.0-incubating
> Reporter: Andreas Neumann
> Assignee: Andreas Neumann
> Fix For: 0.13.0-incubating
>
>
> Currently we get this message:
> {noformat}
> 2017-08-12 00:59:46,938 - INFO
> [TTransactionServer-rpc-857:o.a.t.p.AbstractTransactionLog@102] - Slow append
> to log txlog.1502517541689, took 1431 msec.
> {noformat}
> It would be more useful to know how many bytes were written, how many edits
> were in this sync.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)