Added ex_resp_sync flag to fc_exch_done() API of fc_exch.c,
if ex_resp_sync is not zero then fc_exch_done() blocks till
all already active exch resp functions finish running.

This is prep work in fc_exch.c to fix race in upper layers
with fc_exch.c of calling exch resp handler while exchange done
is already issued by upper layer error handling and no more
exch resp calling is expected by upper layer at that time.

I still need to make use this functionality in FCP.

This race was found by Mike Christie as discussed in
email http://www.open-fcoe.org/pipermail/devel/2008-July/000457.html.
This patch is as per discussed solution similar to del_timer_sync()
in this email thread.

Alternatively I can add a callback function ptr to free fsp pkt, then
call this callback from fc_exch.c just before exch is freed/released.
The FCP can add extra fsp pkt hold for this extra callback function.
I think callback approach  will work without any restriction which
we have with ex_resp_sync flag to set this flag only during error
handling calling context in FCP.

Signed-off-by: Vasu Dev <[EMAIL PROTECTED]>
---

 drivers/scsi/libfc/fc_exch.c  |   30 ++++++++++++++++++++++++------
 drivers/scsi/libfc/fc_fcp.c   |   20 ++++++++++----------
 drivers/scsi/libfc/fc_lport.c |    2 +-
 include/scsi/libfc/libfc.h    |   12 ++++++++++--
 4 files changed, 45 insertions(+), 19 deletions(-)


diff --git a/drivers/scsi/libfc/fc_exch.c b/drivers/scsi/libfc/fc_exch.c
index 342af7e..5ac55d4 100644
--- a/drivers/scsi/libfc/fc_exch.c
+++ b/drivers/scsi/libfc/fc_exch.c
@@ -93,6 +93,8 @@ struct fc_exch {
         */
        void            (*resp)(struct fc_seq *, struct fc_frame *, void *);
        void            *resp_arg;      /* 3rd arg for exchange resp handler */
+       struct completion resp_done;
+       atomic_t  resp_active;
 };
 
 /*
@@ -521,6 +523,7 @@ struct fc_exch *fc_exch_alloc(struct fc_exch_mgr *mp, u16 
xid)
        ep->f_ctl = FC_FC_FIRST_SEQ;    /* next seq is first seq */
        ep->rxid = FC_XID_UNKNOWN;
        ep->class = mp->class;
+       init_completion(&ep->resp_done);
 
        spin_lock_init(&ep->ex_lock);
        setup_timer(&ep->ex_timer, fc_exch_timeout, (unsigned long)ep);
@@ -1189,17 +1192,20 @@ static void fc_exch_recv_seq_resp(struct fc_exch_mgr 
*mp, struct fc_frame *fp)
 
        if (fc_sof_needs_ack(sof))
                fc_seq_send_ack(sp, fp);
+
+       spin_lock_bh(&ep->ex_lock);
        resp = ep->resp;
        ex_resp_arg = ep->resp_arg;
+       if (resp)
+               atomic_inc(&ep->resp_active);
 
        if (fh->fh_type != FC_TYPE_FCP && fr_eof(fp) == FC_EOF_T &&
            (f_ctl & (FC_FC_LAST_SEQ | FC_FC_END_SEQ)) ==
            (FC_FC_LAST_SEQ | FC_FC_END_SEQ)) {
-               spin_lock_bh(&ep->ex_lock);
                fc_exch_complete_locked(ep);
                WARN_ON(fc_seq_exch(sp) != ep);
-               spin_unlock_bh(&ep->ex_lock);
        }
+       spin_unlock_bh(&ep->ex_lock);
 
        /*
         * Call the receive function.
@@ -1214,9 +1220,11 @@ static void fc_exch_recv_seq_resp(struct fc_exch_mgr 
*mp, struct fc_frame *fp)
         * If new exch resp handler is valid then call that
         * first.
         */
-       if (resp)
+       if (resp) {
                resp(sp, fp, ex_resp_arg);
-       else
+               atomic_dec(&ep->resp_active);
+               complete(&ep->resp_done);
+       } else
                fc_frame_free(fp);
        fc_exch_release(ep);
        return;
@@ -1513,7 +1521,7 @@ static void fc_exch_els_rec(struct fc_seq *sp, struct 
fc_frame *rfp)
 
        fp = fc_frame_alloc(fc_seq_exch(sp)->lp, sizeof(*acc));
        if (!fp) {
-               fc_exch_done(sp);
+               fc_exch_done(sp, 0);
                goto out;
        }
        sp = fc_seq_start_next(sp);
@@ -1725,7 +1733,7 @@ void fc_exch_mgr_free(struct fc_exch_mgr *mp)
 }
 EXPORT_SYMBOL(fc_exch_mgr_free);
 
-void fc_exch_done(struct fc_seq *sp)
+void fc_exch_done(struct fc_seq *sp, u8 ex_resp_sync)
 {
        struct fc_exch *ep;
 
@@ -1737,6 +1745,16 @@ void fc_exch_done(struct fc_seq *sp)
                atomic_dec(&ep->ex_refcnt);     /* drop hold for timer */
 
        spin_unlock_bh(&ep->ex_lock);
+
+       /*
+        * if upper layer requested to sync with
+        * all running exch resp handlers
+        * then wait for completion of all
+        * already active exch resp handlers
+        * of the exch.
+        */
+       while (ex_resp_sync && atomic_read(&ep->resp_active))
+               wait_for_completion_timeout(&ep->resp_done, HZ);
        fc_exch_release(fc_seq_exch(sp));
 }
 EXPORT_SYMBOL(fc_exch_done);
diff --git a/drivers/scsi/libfc/fc_fcp.c b/drivers/scsi/libfc/fc_fcp.c
index c422d10..755bc90 100644
--- a/drivers/scsi/libfc/fc_fcp.c
+++ b/drivers/scsi/libfc/fc_fcp.c
@@ -819,9 +819,9 @@ static void fc_fcp_complete(struct fc_fcp_pkt *fsp)
                                f_ctl |= FC_FC_LAST_SEQ | FC_FC_END_SEQ;
                                lp->tt.seq_send(lp, csp, conf_frame, f_ctl);
                        } else
-                               lp->tt.exch_done(csp);
+                               lp->tt.exch_done(csp, 0);
                } else
-                       lp->tt.exch_done(sp);
+                       lp->tt.exch_done(sp, 0);
        }
        fc_io_compl(fsp);
 }
@@ -882,7 +882,7 @@ static void fc_fcp_cleanup_aborted_io(struct fc_fcp_pkt 
*fsp)
 
        if (!(fsp->state & FC_SRB_RCV_STATUS)) {
                if (fsp->seq_ptr) {
-                       lp->tt.exch_done(fsp->seq_ptr);
+                       lp->tt.exch_done(fsp->seq_ptr, 0);
                        fsp->seq_ptr = NULL;
                }
        }
@@ -1037,7 +1037,7 @@ static int fc_fcp_pkt_abort(struct fc_lport *lp, struct 
fc_fcp_pkt *fsp)
        spin_lock(&fsp->scsi_pkt_lock);
 
        if (fsp->seq_ptr) {
-               lp->tt.exch_done(fsp->seq_ptr);
+               lp->tt.exch_done(fsp->seq_ptr, 0);
                fsp->seq_ptr = NULL;
        }
 
@@ -1110,7 +1110,7 @@ static void fc_fcp_cleanup_lun_reset(struct fc_fcp_pkt 
*fsp)
 
        fsp->status_code = FC_CMD_ABORTED;
        if (fsp->seq_ptr) {
-               lp->tt.exch_done(fsp->seq_ptr);
+               lp->tt.exch_done(fsp->seq_ptr, 0);
                fsp->seq_ptr = NULL;
        }
 }
@@ -1153,7 +1153,7 @@ static int fc_lun_reset(struct fc_lport *lp, struct 
fc_fcp_pkt *fsp,
                 * it and it could allow the fsp to be freed from under
                 * fc_tm_done.
                 */
-               lp->tt.exch_done(fsp->seq_ptr);
+               lp->tt.exch_done(fsp->seq_ptr, 0);
                fsp->seq_ptr = NULL;
        }
        fsp->wait_for_comp = 0;
@@ -1204,7 +1204,7 @@ static void fc_tm_done(struct fc_seq *sp, struct fc_frame 
*fp, void *arg)
 
        fc_fcp_fcp_resp(fsp, fp);
        fsp->seq_ptr = NULL;
-       fsp->lp->tt.exch_done(sp);
+       fsp->lp->tt.exch_done(sp, 0);
        fc_frame_free(fp);
        spin_unlock(&fsp->scsi_pkt_lock);
 }
@@ -1545,7 +1545,7 @@ static void fc_timeout_error(struct fc_fcp_pkt *fsp)
 static void fc_fcp_retry_cmd(struct fc_fcp_pkt *fsp)
 {
        if (fsp->seq_ptr) {
-               fsp->lp->tt.exch_done(fsp->seq_ptr);
+               fsp->lp->tt.exch_done(fsp->seq_ptr, 0);
                fsp->seq_ptr = NULL;
        }
 
@@ -1642,7 +1642,7 @@ static void fc_fcp_srr_resp(struct fc_seq *sp, struct 
fc_frame *fp, void *arg)
                break;
        }
        fc_fcp_unlock_pkt(fsp);
-       fsp->lp->tt.exch_done(sp);
+       fsp->lp->tt.exch_done(sp, 0);
 out:
        fc_frame_free(fp);
        fc_fcp_pkt_release(fsp);        /* drop hold for outstanding SRR */
@@ -1652,7 +1652,7 @@ static void fc_fcp_srr_error(struct fc_fcp_pkt *fsp, 
struct fc_frame *fp)
 {
        if (fc_fcp_lock_pkt(fsp))
                goto out;
-       fsp->lp->tt.exch_done(fsp->recov_seq);
+       fsp->lp->tt.exch_done(fsp->recov_seq, 0);
        fsp->recov_seq = NULL;
        switch (PTR_ERR(fp)) {
        case -FC_EX_CLOSED:                     /* e.g., link failure */
diff --git a/drivers/scsi/libfc/fc_lport.c b/drivers/scsi/libfc/fc_lport.c
index 33cd556..800e5f0 100644
--- a/drivers/scsi/libfc/fc_lport.c
+++ b/drivers/scsi/libfc/fc_lport.c
@@ -639,7 +639,7 @@ static void fc_lport_recv(struct fc_lport *lp, struct 
fc_seq *sp,
         *  The common exch_done for all request may not be good
         *  if any request requires longer hold on exhange. XXX
         */
-       lp->tt.exch_done(sp);
+       lp->tt.exch_done(sp, 0);
 }
 
 /*
diff --git a/include/scsi/libfc/libfc.h b/include/scsi/libfc/libfc.h
index d3a2569..0533bd2 100644
--- a/include/scsi/libfc/libfc.h
+++ b/include/scsi/libfc/libfc.h
@@ -251,8 +251,12 @@ struct libfc_function_template {
        /*
         * Indicate that an exchange/sequence tuple is complete and the memory
         * allocated for the related objects may be freed.
+        * if ex_resp_sync is not zero then this function
+        * blocks till all already active exch resp functions
+        * finish running. The ex_resp_sync must be zero if
+        * called from exch resp function context.
         */
-       void (*exch_done)(struct fc_seq *sp);
+       void (*exch_done)(struct fc_seq *sp, u8 ex_resp_sync);
 
        /*
         * Assigns a EM and a free XID for an new exchange and then
@@ -684,8 +688,12 @@ int fc_seq_exch_abort(const struct fc_seq *req_sp);
 /*
  * Indicate that an exchange/sequence tuple is complete and the memory
  * allocated for the related objects may be freed.
+ * if ex_resp_sync is not zero then this function
+ * blocks till all already active exch resp functions
+ * finish running. The ex_resp_sync must be zero if
+ * called from exch resp function context.
  */
-void fc_exch_done(struct fc_seq *sp);
+void fc_exch_done(struct fc_seq *sp, u8 ex_resp_sync);
 
 /*
  * Assigns a EM and XID for a frame and then allocates

_______________________________________________
devel mailing list
[email protected]
http://www.open-fcoe.org/mailman/listinfo/devel

Reply via email to