Repository: kafka
Updated Branches:
  refs/heads/trunk 766dea94e -> 495184916


KAFKA-5132: abort long running transactions

Abort any ongoing transactions that haven't been touched for longer than the 
transaction timeout

Author: Damian Guy <[email protected]>

Reviewers: Jason Gustafson, Apurva Mehta, Ismael Juma, Guozhang Wang

Closes #2957 from dguy/kafka-5132


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49518491
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49518491
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49518491

Branch: refs/heads/trunk
Commit: 4951849163b1defea91129472b5354531407deb9
Parents: 766dea9
Author: Damian Guy <[email protected]>
Authored: Fri May 12 10:36:02 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Fri May 12 10:36:02 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 43 ++++++++++++-
 .../transaction/TransactionMarkerChannel.scala  |  3 +-
 .../TransactionMarkerChannelManager.scala       |  2 +-
 .../transaction/TransactionMetadata.scala       |  4 +-
 .../transaction/TransactionStateManager.scala   | 25 ++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 ++
 .../TransactionCoordinatorTest.scala            | 66 ++++++++++++++++++++
 .../TransactionStateManagerTest.scala           | 32 ++++++++++
 8 files changed, 170 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 38e725f..982e009 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -44,14 +44,15 @@ object TransactionCoordinator {
       config.transactionTopicReplicationFactor,
       config.transactionTopicSegmentBytes,
       config.transactionsLoadBufferSize,
-      config.transactionTopicMinISR)
+      config.transactionTopicMinISR,
+      config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
 
     val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
     val logManager = new TransactionStateManager(config.brokerId, zkUtils, 
scheduler, replicaManager, txnConfig, time)
     val txnMarkerPurgatory = 
DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", 
config.brokerId)
     val transactionMarkerChannelManager = 
TransactionMarkerChannelManager(config, metrics, metadataCache, 
txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, pidManager, logManager, 
transactionMarkerChannelManager, txnMarkerPurgatory, time)
+    new TransactionCoordinator(config.brokerId, pidManager, logManager, 
transactionMarkerChannelManager, txnMarkerPurgatory, scheduler, time)
   }
 
   private def initTransactionError(error: Errors): InitPidResult = {
@@ -76,6 +77,7 @@ class TransactionCoordinator(brokerId: Int,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: 
TransactionMarkerChannelManager,
                              txnMarkerPurgatory: 
DelayedOperationPurgatory[DelayedTxnMarker],
+                             scheduler: Scheduler,
                              time: Time) extends Logging {
   this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
 
@@ -383,11 +385,45 @@ class TransactionCoordinator(brokerId: Int,
 
   def partitionFor(transactionalId: String): Int = 
txnManager.partitionFor(transactionalId)
 
+  private def expireTransactions(): Unit = {
+
+    txnManager.transactionsToExpire().foreach{ idAndMetadata =>
+      idAndMetadata.metadata synchronized {
+        if 
(!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId)
+          && idAndMetadata.metadata.pendingState.isEmpty) {
+          // bump the producerEpoch so that any further requests for this 
transactionalId will be fenced
+          idAndMetadata.metadata.producerEpoch = 
(idAndMetadata.metadata.producerEpoch + 1).toShort
+          idAndMetadata.metadata.prepareTransitionTo(Ongoing)
+          txnManager.appendTransactionToLog(idAndMetadata.transactionalId, 
idAndMetadata.metadata, (errors: Errors) => {
+            if (errors != Errors.NONE)
+              warn(s"failed to append transactionalId 
${idAndMetadata.transactionalId} to log during transaction expiry. 
errors:$errors")
+            else
+              handleEndTransaction(idAndMetadata.transactionalId,
+                idAndMetadata.metadata.pid,
+                idAndMetadata.metadata.producerEpoch,
+                TransactionResult.ABORT,
+                (errors: Errors) => {
+                  if (errors != Errors.NONE)
+                    warn(s"rollback of transactionalId: 
${idAndMetadata.transactionalId} failed during transaction expiry. errors: 
$errors")
+                }
+              )
+          })
+        }
+      }
+    }
+  }
+
   /**
    * Startup logic executed at the same time when the server starts up.
    */
   def startup(enablePidExpiration: Boolean = true) {
     info("Starting up.")
+    scheduler.startup()
+    scheduler.schedule("transaction-expiration",
+      expireTransactions,
+      TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs,
+      TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+    )
     if (enablePidExpiration)
       txnManager.enablePidExpiration()
     txnMarkerChannelManager.start()
@@ -403,10 +439,11 @@ class TransactionCoordinator(brokerId: Int,
   def shutdown() {
     info("Shutting down.")
     isActive.set(false)
+    scheduler.shutdown()
+    txnMarkerPurgatory.shutdown()
     pidManager.shutdown()
     txnManager.shutdown()
     txnMarkerChannelManager.shutdown()
-    txnMarkerPurgatory.shutdown()
     info("Shutdown complete.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
index cad3ea5..e60bd40 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
@@ -166,9 +166,10 @@ class TransactionMarkerChannel(interBrokerListenerName: 
ListenerName,
     pendingTxnMap.get(PendingTxnKey(metadataPartition, pid))
   }
 
-  def clear(): Unit = {
+  def close(): Unit = {
     brokerStateMap.clear()
     pendingTxnMap.clear()
+    networkClient.close()
   }
 
   def removeStateForPartition(partition: Int): mutable.Iterable[Long] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 2c17564..1b7ea56 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -107,7 +107,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   def shutdown(): Unit = {
     interBrokerSendThread.shutdown()
-    transactionMarkerChannel.clear()
+    transactionMarkerChannel.close()
   }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d84e054..a81e47b 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -161,8 +161,8 @@ private[coordinator] class TransactionMetadata(val pid: 
Long,
       txnTimeoutMs == other.txnTimeoutMs &&
       state.equals(other.state) &&
       topicPartitions.equals(other.topicPartitions) &&
-      transactionStartTime.equals(other.transactionStartTime) &&
-      lastUpdateTimestamp.equals(other.lastUpdateTimestamp)
+      transactionStartTime == other.transactionStartTime &&
+      lastUpdateTimestamp == other.lastUpdateTimestamp
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e23324f..f5dc3c0 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -43,6 +43,7 @@ object TransactionManager {
   // default transaction management config values
   val DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7).toInt
   val DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15).toInt
+  val DefaultRemoveExpiredTransactionsIntervalMs = 
TimeUnit.MINUTES.toMillis(1).toInt
 }
 
 /**
@@ -82,9 +83,9 @@ class TransactionStateManager(brokerId: Int,
   private val transactionTopicPartitionCount = 
getTransactionTopicPartitionCount
 
   def enablePidExpiration() {
-    scheduler.startup()
-
-    // TODO: add transaction and pid expiration logic
+    if (!scheduler.isStarted)
+      scheduler.startup()
+    // TODO: add pid expiration logic
   }
 
   /**
@@ -142,6 +143,19 @@ class TransactionStateManager(brokerId: Int,
     loadingPartitions.contains(partitionId)
   }
 
+
+  def transactionsToExpire(): Iterable[TransactionalIdAndMetadata] = {
+    val now = time.milliseconds()
+    transactionMetadataCache.filter { case (_, metadata) =>
+      metadata.state match {
+        case Ongoing =>
+          metadata.transactionStartTime + metadata.txnTimeoutMs < now
+        case _ => false
+      }
+    }.map {case (id, metadata) =>
+      TransactionalIdAndMetadata(id, metadata)
+    }
+  }
   /**
    * Gets the partition count of the transaction log topic from ZooKeeper.
    * If the topic does not exist, the default partition count is returned.
@@ -445,4 +459,7 @@ private[transaction] case class 
TransactionConfig(transactionalIdExpirationMs: I
                                                   
transactionLogReplicationFactor: Short = 
TransactionLog.DefaultReplicationFactor,
                                                   transactionLogSegmentBytes: 
Int = TransactionLog.DefaultSegmentBytes,
                                                   
transactionLogLoadBufferSize: Int = TransactionLog.DefaultLoadBufferSize,
-                                                  
transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas)
+                                                  
transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas,
+                                                  
removeExpiredTransactionsIntervalMs: Int = 
TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+
+case class TransactionalIdAndMetadata(transactionalId: String, metadata: 
TransactionMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 708201a..76f6380 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -165,6 +165,7 @@ object Defaults {
   val TransactionsTopicReplicationFactor = 
TransactionLog.DefaultReplicationFactor
   val TransactionsTopicPartitions = TransactionLog.DefaultNumPartitions
   val TransactionsTopicSegmentBytes = TransactionLog.DefaultSegmentBytes
+  val TransactionsExpiredTransactionCleanupIntervalMS = 
TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefault = 
ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
@@ -349,6 +350,8 @@ object KafkaConfig {
   val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
   val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
   val TransactionsTopicReplicationFactorProp = 
"transaction.state.log.replication.factor"
+  val TransactionsExpiredTransactionCleanupIntervalMsProp = 
"transaction.expired.transaction.cleanup.interval.ms"
+
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
@@ -594,6 +597,7 @@ object KafkaConfig {
     "Internal topic creation will fail until the cluster size meets this 
replication factor requirement."
   val TransactionsTopicPartitionsDoc = "The number of partitions for the 
transaction topic (should not change after deployment)."
   val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes 
should be kept relatively small in order to facilitate faster log compaction 
and cache loads"
+  val TransactionsExpiredTransactionCleanupIntervalMsDoc = "The interval at 
which to rollback expired transactions"
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when 
dynamic default quotas are not configured for <user>, <client-id> or <user, 
client-id> in Zookeeper. " +
@@ -799,6 +803,7 @@ object KafkaConfig {
       .define(TransactionsTopicReplicationFactorProp, SHORT, 
Defaults.TransactionsTopicReplicationFactor, atLeast(1), HIGH, 
TransactionsTopicReplicationFactorDoc)
       .define(TransactionsTopicPartitionsProp, INT, 
Defaults.TransactionsTopicPartitions, atLeast(1), HIGH, 
TransactionsTopicPartitionsDoc)
       .define(TransactionsTopicSegmentBytesProp, INT, 
Defaults.TransactionsTopicSegmentBytes, atLeast(1), HIGH, 
TransactionsTopicSegmentBytesDoc)
+      .define(TransactionsExpiredTransactionCleanupIntervalMsProp, INT, 
Defaults.TransactionsExpiredTransactionCleanupIntervalMS, atLeast(1), LOW, 
TransactionsExpiredTransactionCleanupIntervalMsDoc)
 
       /** ********* Kafka Metrics Configuration ***********/
       .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, 
atLeast(1), LOW, MetricNumSamplesDoc)
@@ -1008,6 +1013,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val transactionTopicReplicationFactor = 
getShort(KafkaConfig.TransactionsTopicReplicationFactorProp)
   val transactionTopicPartitions = 
getInt(KafkaConfig.TransactionsTopicPartitionsProp)
   val transactionTopicSegmentBytes = 
getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
+  val transactionTransactionsExpiredTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsExpiredTransactionCleanupIntervalMsProp)
 
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index cf773bb..a9f1bca 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import kafka.server.DelayedOperationPurgatory
+import kafka.utils.MockScheduler
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
@@ -46,12 +47,14 @@ class TransactionCoordinatorTest {
 
   private val txnMarkerPurgatory = new 
DelayedOperationPurgatory[DelayedTxnMarker]("test", new MockTimer, 
reaperEnabled = false)
   private val partitions = mutable.Set[TopicPartition](new 
TopicPartition("topic1", 0))
+  private val scheduler = new MockScheduler(time)
 
   val coordinator: TransactionCoordinator = new 
TransactionCoordinator(brokerId,
     pidManager,
     transactionManager,
     transactionMarkerChannelManager,
     txnMarkerPurgatory,
+    scheduler,
     time)
 
   var result: InitPidResult = _
@@ -613,6 +616,69 @@ class TransactionCoordinatorTest {
     EasyMock.verify(transactionManager)
   }
 
+  @Test
+  def shouldAbortExpiredTransactionsInOngoingState(): Unit = {
+    EasyMock.expect(transactionManager.transactionsToExpire())
+    .andReturn(List(TransactionalIdAndMetadata(transactionalId,
+      new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, 
time.milliseconds(), time.milliseconds()))))
+
+    // should bump the epoch and append to the log
+    val metadata = new TransactionMetadata(pid, (epoch + 1).toShort, 0, 
Ongoing, partitions, time.milliseconds(), time.milliseconds())
+    
EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+      EasyMock.eq(metadata),
+      EasyMock.capture(capturedErrorsCallback)))
+    .andAnswer(new IAnswer[Unit] {
+      override def answer(): Unit = {
+        capturedErrorsCallback.getValue.apply(Errors.NONE)
+      }
+    }).once()
+
+    EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId))
+      .andReturn(true)
+    EasyMock.expect(transactionManager.getTransactionState(transactionalId))
+      .andReturn(Some(metadata))
+      .once()
+
+    // now should perform the rollback and append the state as PrepareAbort
+    val abortMetadata = metadata.copy()
+    abortMetadata.state = PrepareAbort
+    // need to allow for the time.sleep below
+    abortMetadata.lastUpdateTimestamp = time.milliseconds() + 
TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+
+    
EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
+      EasyMock.eq(abortMetadata),
+      EasyMock.capture(capturedErrorsCallback)))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {}
+      })
+    .once()
+
+    EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+
+    coordinator.startup(false)
+    time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+    scheduler.tick()
+    EasyMock.verify(transactionManager)
+  }
+
+  @Test
+  def shouldNotAbortExpiredTransactionsThatHaveAPendingStateTransition(): Unit 
= {
+    val metadata = new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, 
time.milliseconds(), time.milliseconds())
+    metadata.prepareTransitionTo(PrepareCommit)
+
+    EasyMock.expect(transactionManager.transactionsToExpire())
+      .andReturn(List(TransactionalIdAndMetadata(transactionalId,
+        metadata)))
+    
+    EasyMock.replay(transactionManager, transactionMarkerChannelManager)
+    coordinator.startup(false)
+
+    time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs)
+    scheduler.tick()
+    EasyMock.verify(transactionManager)
+
+  }
+
   private def 
validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: 
TransactionState) = {
     val transactionId = "tid"
     EasyMock.expect(transactionManager.isCoordinatorFor(transactionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 09a89dd..2a14898 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -305,6 +305,38 @@ class TransactionStateManagerTest {
   }
 
   @Test
+  def shouldOnlyConsiderTransactionsInTheOngoingStateForExpiry(): Unit = {
+    txnMetadata1.state = Ongoing
+    txnMetadata1.transactionStartTime = time.milliseconds()
+    transactionManager.addTransaction(txnId1, txnMetadata1)
+    transactionManager.addTransaction(txnId2, txnMetadata2)
+
+    val ongoingButNotExpiring = txnMetadata1.copy()
+    ongoingButNotExpiring.txnTimeoutMs = 10000
+    transactionManager.addTransaction("not-expiring", ongoingButNotExpiring)
+
+    val prepareCommit = txnMetadata1.copy()
+    prepareCommit.state = PrepareCommit
+    transactionManager.addTransaction("pc", prepareCommit)
+
+    val prepareAbort = txnMetadata1.copy()
+    prepareAbort.state = PrepareAbort
+    transactionManager.addTransaction("pa", prepareAbort)
+
+    val committed = txnMetadata1.copy()
+    committed.state = CompleteCommit
+    transactionManager.addTransaction("cc", committed)
+
+    val aborted = txnMetadata1.copy()
+    aborted.state = CompleteAbort
+    transactionManager.addTransaction("ca", aborted)
+
+    time.sleep(2000)
+    val expiring = transactionManager.transactionsToExpire()
+    assertEquals(List(TransactionalIdAndMetadata(txnId1, txnMetadata1)), 
expiring)
+  }
+
+  @Test
   def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
     verifyWritesTxnMarkersInPrepareState(PrepareCommit)
   }

Reply via email to