diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 4353e14..f6e8331 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -432,14 +432,10 @@ DropSubscription(DropSubscriptionStmt *stmt)
 	Oid			subid;
 	Datum		datum;
 	bool		isnull;
-	char	   *subname;
 	char	   *conninfo;
+	char	   *subname;
 	char	   *slotname;
-	char		originname[NAMEDATALEN];
-	char	   *err = NULL;
-	RepOriginId	originid;
-	WalReceiverConn	   *wrconn = NULL;
-	StringInfoData		cmd;
+	LogicalRepWorker   *worker = NULL;
 
 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -508,56 +504,19 @@ DropSubscription(DropSubscriptionStmt *stmt)
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
-	/* Protect against launcher restarting the worker. */
-	LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
-
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid);
-
-	LWLockRelease(LogicalRepLauncherLock);
+	/* Mark the apply worker need to be stopped */
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-	/* Remove the origin tracking if exists. */
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
-	originid = replorigin_by_name(originname, true);
-	if (originid != InvalidRepOriginId)
-		replorigin_drop(originid);
-
-	/* If the user asked to not drop the slot, we are done mow.*/
-	if (!stmt->drop_slot)
+	worker = logicalrep_worker_find(subid);
+	if (worker)
 	{
-		heap_close(rel, NoLock);
-		return;
+		if (stmt->drop_slot)
+			worker->drop_slot = true;
+		worker->need_to_stop = true;
 	}
+	on_commit_launcher_wakeup = true;
 
-	/*
-	 * Otherwise drop the replication slot at the publisher node using
-	 * the replication connection.
-	 */
-	load_file("libpqwalreceiver", false);
-
-	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname);
-
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
-	if (wrconn == NULL)
-		ereport(ERROR,
-				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
-				 errdetail("The error was: %s", err)));
-
-	if (!walrcv_command(wrconn, cmd.data, &err))
-		ereport(ERROR,
-				(errmsg("could not drop the replication slot \"%s\" on publisher",
-						slotname),
-				 errdetail("The error was: %s", err)));
-	else
-		ereport(NOTICE,
-				(errmsg("dropped replication slot \"%s\" on publisher",
-						slotname)));
-
-	walrcv_disconnect(wrconn);
-
-	pfree(cmd.data);
+	LWLockRelease(LogicalRepWorkerLock);
 
 	heap_close(rel, NoLock);
 }
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index d222cff..8e40960 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,7 +73,7 @@ static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 
 bool		got_SIGTERM = false;
-static bool	on_commit_laucher_wakeup = false;
+bool	on_commit_launcher_wakeup = false;
 
 Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
 
@@ -274,6 +274,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
 	worker->dbid = dbid;
 	worker->userid = userid;
 	worker->subid = subid;
+	worker->need_to_stop = false;
+	worker->drop_slot = false;
 
 	LWLockRelease(LogicalRepWorkerLock);
 
@@ -305,28 +307,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
 /*
  * Stop the logical replication worker and wait until it detaches from the
  * slot.
- *
- * The caller must hold LogicalRepLauncherLock to ensure that new workers are
- * not being started during this function call.
  */
 void
-logicalrep_worker_stop(Oid subid)
+logicalrep_worker_stop(LogicalRepWorker *worker)
 {
-	LogicalRepWorker *worker;
-
-	Assert(LWLockHeldByMe(LogicalRepLauncherLock));
-
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-	worker = logicalrep_worker_find(subid);
-
-	/* No worker, nothing to do. */
-	if (!worker)
-	{
-		LWLockRelease(LogicalRepWorkerLock);
-		return;
-	}
-
 	/*
 	 * If we found worker but it does not have proc set it is starting up,
 	 * wait for it to finish and then kill it.
@@ -335,8 +319,6 @@ logicalrep_worker_stop(Oid subid)
 	{
 		int	rc;
 
-		LWLockRelease(LogicalRepWorkerLock);
-
 		CHECK_FOR_INTERRUPTS();
 
 		/* Wait for signal. */
@@ -370,7 +352,6 @@ logicalrep_worker_stop(Oid subid)
 
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, SIGTERM);
-	LWLockRelease(LogicalRepWorkerLock);
 
 	/* ... and wait for it to die. */
 	for (;;)
@@ -526,7 +507,7 @@ ApplyLauncherShmemInit(void)
 void
 AtCommit_ApplyLauncher(void)
 {
-	if (on_commit_laucher_wakeup)
+	if (on_commit_launcher_wakeup)
 		ApplyLauncherWakeup();
 }
 
@@ -540,8 +521,8 @@ AtCommit_ApplyLauncher(void)
 void
 ApplyLauncherWakeupAtCommit(void)
 {
-	if (!on_commit_laucher_wakeup)
-		on_commit_laucher_wakeup = true;
+	if (!on_commit_launcher_wakeup)
+		on_commit_launcher_wakeup = true;
 }
 
 void
@@ -579,6 +560,7 @@ ApplyLauncherMain(Datum main_arg)
 	/* Enter main loop */
 	while (!got_SIGTERM)
 	{
+		int			i;
 		int			rc;
 		List	   *sublist;
 		ListCell   *lc;
@@ -590,6 +572,15 @@ ApplyLauncherMain(Datum main_arg)
 
 		now = GetCurrentTimestamp();
 
+		/* Check if there is worker that need to be stopped */
+		for (i = 0; i < max_logical_replication_workers; i++)
+		{
+			LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+			if (w->need_to_stop)
+				logicalrep_worker_stop(w);
+		}
+
 		/* Limit the start retry to once a wal_retrieve_retry_interval */
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9383960..4e666bc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1022,7 +1022,7 @@ ApplyLoop(void)
 			}
 		}
 
-		if (!in_remote_transaction)
+		if (!in_remote_transaction && !MyLogicalRepWorker->need_to_stop)
 		{
 			/*
 			 * If we didn't get any transactions for a while there might be
@@ -1221,7 +1221,7 @@ reread_subscription(void)
 	/*
 	 * Exit if the subscription was removed.
 	 * This normally should not happen as the worker gets killed
-	 * during DROP SUBSCRIPTION.
+	 * after DROP SUBSCRIPTION committed.
 	 */
 	if (!newsub)
 	{
@@ -1321,6 +1321,7 @@ ApplyWorkerMain(Datum main_arg)
 	int				server_version;
 	TimeLineID		startpointTLI;
 	WalRcvStreamOptions options;
+	WalReceiverConn *conn = NULL;
 
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
@@ -1422,8 +1423,55 @@ ApplyWorkerMain(Datum main_arg)
 	/* Run the main loop. */
 	ApplyLoop();
 
+	/* disconnect logical replication */
 	walrcv_disconnect(wrconn);
 
+	conn = walrcv_connect(MySubscription->conninfo, true,
+						  MySubscription->name, &err);
+	if (conn == NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to publisher when attempting to "
+						"drop the replication slot \"%s\"", MySubscription->slotname),
+				 errdetail("The error was: %s", err)));
+
+	/*
+	 * Drop the replication slot at the publisher node using
+	 * the replication connection.
+	 */
+	if (MyLogicalRepWorker->drop_slot)
+	{
+		StringInfoData		cmd;
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", MySubscription->slotname);
+
+		if (!walrcv_command(conn, cmd.data, &err))
+			ereport(ERROR,
+					(errmsg("could not drop the replication slot \"%s\" on publisher",
+							MySubscription->slotname),
+					 errdetail("The error was: %s", err)));
+		else
+			ereport(NOTICE,
+					(errmsg("dropped replication slot \"%s\" on publisher",
+							MySubscription->slotname)));
+		pfree(cmd.data);
+	}
+
+	/* Remove the origin tracking if exists. */
+	if (originid != InvalidRepOriginId)
+	{
+		replorigin_session_reset();
+		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_session_origin_lsn = InvalidXLogRecPtr;
+		replorigin_session_origin_timestamp = 0;
+
+		StartTransactionCommand();
+		replorigin_drop(originid);
+		CommitTransactionCommand();
+	}
+
+	walrcv_disconnect(conn);
+
 	/* We should only get here if we received SIGTERM */
 	proc_exit(0);
 }
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 8cbf268..3db6f55 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -40,6 +40,8 @@ typedef struct LogicalRepWorker
 	TimestampTz	last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz	reply_time;
+	bool	need_to_stop;	/* Set by the backend executing DROP SUBSCRIPTION */
+	bool	drop_slot;		/* Drop replication slot when exits? */
 } LogicalRepWorker;
 
 /* libpqreceiver connection */
@@ -51,12 +53,13 @@ extern LogicalRepWorker	   *MyLogicalRepWorker;
 
 extern bool	in_remote_transaction;
 extern bool	got_SIGTERM;
+extern bool on_commit_launcher_wakeup;
 
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid);
 extern int logicalrep_worker_count(Oid subid);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid);
-extern void logicalrep_worker_stop(Oid subid);
+extern void logicalrep_worker_stop(LogicalRepWorker *worker);
 extern void logicalrep_worker_wakeup(Oid subid);
 
 extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index b51740b..39dc97d 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -5,6 +5,27 @@ use PostgresNode;
 use TestLib;
 use Test::More tests => 11;
 
+sub test_sub
+{
+	my ($self, $expected, $msg, $sql) = @_;
+
+	my $timeout_max = 30;
+	my $timeout = 0;
+	my $result;
+
+	while ($timeout < $timeout_max)
+	{
+		$result = $self->safe_psql('postgres', $sql);
+
+		last if ($result eq $expected);
+
+		$timeout++;
+		sleep 1;
+	}
+
+	is ($result, $expected, $msg);
+}
+
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
 $node_publisher->init(allows_streaming => 'logical');
@@ -169,6 +190,13 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full");
 is($result, qq(11|0|100), 'check replicated insert after alter publication');
 
+# check if DROP SUBSCRIPTION is transactional
+$node_subscriber->safe_psql('postgres', "
+BEGIN;
+DROP SUBSCRIPTION tap_sub;
+ROLLBACK;
+");
+
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 
@@ -176,13 +204,13 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
 is($result, qq(0), 'check subscription was dropped on subscriber');
 
-$result =
-  $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
-is($result, qq(0), 'check replication slot was dropped on publisher');
+test_sub($node_publisher, qq(0),
+		 'check replication slot was dropped on publisher',
+		 "SELECT count(*) FROM pg_replication_slots");
 
-$result =
-  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
-is($result, qq(0), 'check replication origin was dropped on subscriber');
+test_sub($node_publisher, qq(0),
+		 'check replication origin was dropped on subscriber',
+		 "SELECT count(*) FROM pg_replication_origin");
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
