Github user anew commented on a diff in the pull request:
https://github.com/apache/incubator-tephra/pull/53#discussion_r138759678
--- Diff:
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
---
@@ -85,105 +100,145 @@ public long getTimestamp() {
@Override
public void append(TransactionEdit edit) throws IOException {
- long startTime = System.nanoTime();
- synchronized (this) {
- ensureAvailable();
-
- Entry entry = new Entry(new
LongWritable(logSequence.getAndIncrement()), edit);
-
- // add to pending edits
- append(entry);
- }
-
- // wait for sync to complete
- sync();
- long durationMillis = (System.nanoTime() - startTime) / 1000000L;
- if (durationMillis > SLOW_APPEND_THRESHOLD) {
- LOG.info("Slow append to log " + getName() + ", took " +
durationMillis + " msec.");
- }
+ append(Collections.singletonList(edit));
}
@Override
public void append(List<TransactionEdit> edits) throws IOException {
- long startTime = System.nanoTime();
- synchronized (this) {
- ensureAvailable();
-
+ if (closing) { // or closed, which implies closing
+ throw new IOException("Log " + getName() + " is closing or already
closed, cannot append");
+ }
+ if (!initialized) {
+ init();
+ }
+ // synchronizing here ensures that elements in the queue are ordered
by seq number
+ synchronized (logSequence) {
for (TransactionEdit edit : edits) {
- Entry entry = new Entry(new
LongWritable(logSequence.getAndIncrement()), edit);
-
- // add to pending edits
- append(entry);
+ pendingWrites.add(new Entry(new
LongWritable(logSequence.getAndIncrement()), edit));
}
}
-
- // wait for sync to complete
+ // try to sync all pending edits (competing for this with other
threads)
sync();
- long durationMillis = (System.nanoTime() - startTime) / 1000000L;
- if (durationMillis > SLOW_APPEND_THRESHOLD) {
- LOG.info("Slow append to log " + getName() + ", took " +
durationMillis + " msec.");
- }
}
- private void ensureAvailable() throws IOException {
- if (closed) {
- throw new IOException("Log " + getName() + " is already closed,
cannot append!");
- }
- if (!initialized) {
- init();
+ /**
+ * Return all pending writes at the time the method is called, or null
if no writes are pending.
+ *
+ * Note that after this method returns, there can be additional pending
writes,
+ * added concurrently while the existing pending writes are removed.
+ */
+ @Nullable
+ private Entry[] getPendingWrites() {
+ synchronized (this) {
+ if (pendingWrites.isEmpty()) {
+ return null;
+ }
+ Entry[] entriesToSync = new Entry[pendingWrites.size()];
+ for (int i = 0; i < entriesToSync.length; i++) {
+ entriesToSync[i] = pendingWrites.remove();
+ }
+ return entriesToSync;
}
}
- /*
- * Appends new writes to the pendingWrites. It is better to keep it in
- * our own queue rather than writing it to the HDFS output stream because
- * HDFSOutputStream.writeChunk is not lightweight at all.
+ /**
+ * When multiple threads try to log edits at the same time, they all
will call (@link #append}
+ * followed by {@link #sync()}, concurrently. Hence, it can happen that
multiple {@code append()}
+ * are followed by a single {@code sync}, or vice versa.
+ *
+ * We want to record the time and position of the first {@code append()}
after a {@code sync()},
+ * then measure the time after the next {@code sync()}, and log a
warning if it exceeds a threshold.
+ * Therefore this is called every time before we write the pending list
out to the log writer.
+ *
+ * See {@link #stopTimer(TransactionLogWriter)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
*/
- private void append(Entry e) throws IOException {
- pendingWrites.add(e);
+ private void startTimerIfNeeded(TransactionLogWriter writer, int
entryCount) throws IOException {
+ // no sync needed because this is only called within a sync block
+ if (positionBeforeWrite == -1L) {
+ positionBeforeWrite = writer.getPosition();
+ countSinceLastSync = 0;
+ stopWatch.reset().start();
+ }
+ countSinceLastSync += entryCount;
}
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- private List<Entry> getPendingWrites() {
- synchronized (this) {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<>();
- return save;
+ /**
+ * Called by a {@code sync()} after flushing to file system. Issues a
warning if the write(s)+sync
+ * together exceed a threshold.
+ *
+ * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
+ */
+ private void stopTimer(TransactionLogWriter writer) throws IOException {
+ // this method is only called by a thread if it actually called
sync(), inside a sync block
+ if (positionBeforeWrite != -1L) { // actually it should never be -1,
but just in case
+ stopWatch.stop();
+ long elapsed = stopWatch.elapsedMillis();
+ long bytesWritten = writer.getPosition() - positionBeforeWrite;
+ if (elapsed >= slowAppendThreshold) {
+ LOG.info("Slow append to log {}, took {} ms for {} entr{} and {}
bytes.",
+ getName(), elapsed, countSinceLastSync,
countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
+ }
+ metricsCollector.histogram("wal.sync.size", countSinceLastSync);
+ metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); //
single sync won't exceed max int
}
+ positionBeforeWrite = -1L;
+ countSinceLastSync = 0;
}
private void sync() throws IOException {
// writes out pending entries to the HLog
- TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
int entryCount = 0;
synchronized (this) {
if (closed) {
- return;
- }
- // prevent writer being dereferenced
- tmpWriter = writer;
-
- List<Entry> currentPending = getPendingWrites();
- if (!currentPending.isEmpty()) {
- tmpWriter.commitMarker(currentPending.size());
+ if (pendingWrites.isEmpty()) {
+ // this expected: close() sets closed to true after syncing all
pending writes (including ours)
+ return;
+ }
+ // this should never happen because close() only sets closed=true
after syncing.
+ // but if it should happen, we must fail this call because we
don't know whether the edit was persisted
+ throw new IOException(
+ "Unexpected state: Writer is closed but there are pending edits.
Cannot guarantee that edits were persisted");
}
-
- // write out all accumulated entries to log.
- for (Entry e : currentPending) {
- tmpWriter.append(e);
- entryCount++;
- latestSeq = Math.max(latestSeq, e.getKey().get());
+ Entry[] currentPending = getPendingWrites();
+ if (currentPending != null) {
+ entryCount = currentPending.length;
+ startTimerIfNeeded(writer, entryCount);
+ writer.commitMarker(entryCount);
+ for (Entry e : currentPending) {
+ writer.append(e);
+ }
+ // sequence are guaranteed to be ascending, so the last one is the
greatest
+ latestSeq = currentPending[currentPending.length -
1].getKey().get();
+ writtenUpTo = latestSeq;
}
}
- long lastSynced = syncedUpTo.get();
+ // giving up the sync lock here allows other threads to write their
edits before the sync happens.
+ // hence, we can have the edits from n threads in one sync.
+
// someone else might have already synced our edits, avoid double
syncing
- if (lastSynced < latestSeq) {
- tmpWriter.sync();
- metricsCollector.histogram("wal.sync.size", entryCount);
- syncedUpTo.compareAndSet(lastSynced, latestSeq);
+ if (syncedUpTo < latestSeq) {
--- End diff --
Please see the previous discussion of this at
https://github.com/apache/incubator-tephra/pull/53#pullrequestreview-62374698
---