[
https://issues.apache.org/jira/browse/HDFS-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
angerszhu updated HDFS-14437:
-----------------------------
Attachment: HDFS-14437.reproductionwithlog.patch
HDFS-14437.reproduction.patch
> Exception happened when rollEditLog expects empty
> EditsDoubleBuffer.bufCurrent but not
> -----------------------------------------------------------------------------------------
>
> Key: HDFS-14437
> URL: https://issues.apache.org/jira/browse/HDFS-14437
> Project: Hadoop HDFS
> Issue Type: Bug
> Components: ha, namenode, qjm
> Reporter: angerszhu
> Priority: Major
> Attachments: HDFS-14437.reproduction.patch,
> HDFS-14437.reproductionwithlog.patch, screenshot-1.png
>
>
> For the problem mentioned in https://issues.apache.org/jira/browse/HDFS-10943
> , I have sort the process of write and flush EditLog and some important
> function, I found the in the class FSEditLog class, the close() function
> will call such process like below:
>
> {code:java}
> waitForSyncToFinish();
> endCurrentLogSegment(true);{code}
> since we have gain the object lock in the function close(), so when
> waitForSyncToFish() method return, it mean all logSync job has done and all
> data in bufReady has been flushed out, and since current thread has the lock
> of this object, when call endCurrentLogSegment(), no other thread will gain
> the lock so they can't write new editlog into currentBuf.
> But when we don't call waitForSyncToFish() before endCurrentLogSegment(),
> there may be some autoScheduled logSync()'s flush process is doing, since
> this process don't need
> synchronization since it has mention in the comment of logSync() method :
>
> {code:java}
> /**
> * Sync all modifications done by this thread.
> *
> * The internal concurrency design of this class is as follows:
> * - Log items are written synchronized into an in-memory buffer,
> * and each assigned a transaction ID.
> * - When a thread (client) would like to sync all of its edits, logSync()
> * uses a ThreadLocal transaction ID to determine what edit number must
> * be synced to.
> * - The isSyncRunning volatile boolean tracks whether a sync is currently
> * under progress.
> *
> * The data is double-buffered within each edit log implementation so that
> * in-memory writing can occur in parallel with the on-disk writing.
> *
> * Each sync occurs in three steps:
> * 1. synchronized, it swaps the double buffer and sets the isSyncRunning
> * flag.
> * 2. unsynchronized, it flushes the data to storage
> * 3. synchronized, it resets the flag and notifies anyone waiting on the
> * sync.
> *
> * The lack of synchronization on step 2 allows other threads to continue
> * to write into the memory buffer while the sync is in progress.
> * Because this step is unsynchronized, actions that need to avoid
> * concurrency with sync() should be synchronized and also call
> * waitForSyncToFinish() before assuming they are running alone.
> */
> public void logSync() {
> long syncStart = 0;
> // Fetch the transactionId of this thread.
> long mytxid = myTransactionId.get().txid;
>
> boolean sync = false;
> try {
> EditLogOutputStream logStream = null;
> synchronized (this) {
> try {
> printStatistics(false);
> // if somebody is already syncing, then wait
> while (mytxid > synctxid && isSyncRunning) {
> try {
> wait(1000);
> } catch (InterruptedException ie) {
> }
> }
> //
> // If this transaction was already flushed, then nothing to do
> //
> if (mytxid <= synctxid) {
> numTransactionsBatchedInSync++;
> if (metrics != null) {
> // Metrics is non-null only when used inside name node
> metrics.incrTransactionsBatchedInSync();
> }
> return;
> }
>
> // now, this thread will do the sync
> syncStart = txid;
> isSyncRunning = true;
> sync = true;
> // swap buffers
> try {
> if (journalSet.isEmpty()) {
> throw new IOException("No journals available to flush");
> }
> editLogStream.setReadyToFlush();
> } catch (IOException e) {
> final String msg =
> "Could not sync enough journals to persistent storage " +
> "due to " + e.getMessage() + ". " +
> "Unsynced transactions: " + (txid - synctxid);
> LOG.fatal(msg, new Exception());
> synchronized(journalSetLock) {
> IOUtils.cleanup(LOG, journalSet);
> }
> terminate(1, msg);
> }
> } finally {
> // Prevent RuntimeException from blocking other log edit write
> doneWithAutoSyncScheduling();
> }
> //editLogStream may become null,
> //so store a local variable for flush.
> logStream = editLogStream;
> }
>
> // do the sync
> long start = now();
> try {
> if (logStream != null) {
> logStream.flush();
> }
> } catch (IOException ex) {
> synchronized (this) {
> final String msg =
> "Could not sync enough journals to persistent storage. "
> + "Unsynced transactions: " + (txid - synctxid);
> LOG.fatal(msg, new Exception());
> synchronized(journalSetLock) {
> IOUtils.cleanup(LOG, journalSet);
> }
> terminate(1, msg);
> }
> }
> long elapsed = now() - start;
> if (metrics != null) { // Metrics non-null only when used inside name node
> metrics.addSync(elapsed);
> }
>
> } finally {
> // Prevent RuntimeException from blocking other log edit sync
> synchronized (this) {
> if (sync) {
> synctxid = syncStart;
> for (JournalManager jm : journalSet.getJournalManagers()) {
> /**
> * {@link FileJournalManager#lastReadableTxId} is only meaningful
> * for file-based journals. Therefore the interface is not added to
> * other types of {@link JournalManager}.
> */
> if (jm instanceof FileJournalManager) {
> ((FileJournalManager)jm).setLastReadableTxId(syncStart);
> }
> }
> isSyncRunning = false;
> }
> this.notifyAll();
> }
> }
> }
> {code}
> So when not call waitForSyncFinish() before endCurrentLogSegment() will came
> to a situation that when it can't guarantee that when call
> endCurrentLogSegment() method, there is no flush job was doing. Then in the
> endCurrentLogSegment() method process , bufReady may haven't been flushed out
> totally, then it swap with the bufCurrent, finally when call
> EditLogOutputStream's close() function, there is still un-flushed bytes in
> bufCurrent then cause the error in
> https://issues.apache.org/jira/browse/HDFS-10943
>
> so maybe we should add waitForSyncFinish() before endCurrentLogSegment()
> method in rollEditLog() method in FSEditLog class ?
> {code:java}
> synchronized long rollEditLog() throws IOException {
> LOG.info("Rolling edit logs");
> waitForSyncToFinish();
> endCurrentLogSegment(true);
>
> long nextTxId = getLastWrittenTxId() + 1;
> startLogSegment(nextTxId, true);
>
> assert curSegmentTxId == nextTxId;
> return nextTxId;
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]