Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165349012
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
- if (checkpointOptions.getCheckpointType() !=
CheckpointOptions.CheckpointType.SAVEPOINT &&
- enableIncrementalCheckpointing) {
- return snapshotIncrementally(checkpointId, timestamp,
streamFactory);
- } else {
- return snapshotFully(checkpointId, timestamp,
streamFactory);
- }
+ return snapshotStrategy.performSnapshot(checkpointId,
timestamp, streamFactory, checkpointOptions);
}
- private RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshotIncrementally(
- final long checkpointId,
- final long checkpointTimestamp,
- final CheckpointStreamFactory checkpointStreamFactory) throws
Exception {
-
- if (db == null) {
- throw new IOException("RocksDB closed.");
- }
+ @Override
+ public void restore(StateObjectCollection<KeyedStateHandle>
restoreState) throws Exception {
+ LOG.info("Initializing RocksDB keyed state backend from
snapshot.");
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot
performed on empty keyed state at " +
- checkpointTimestamp + " . Returning
null.");
- }
- return DoneFuture.nullValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring snapshot from state handles: {}.",
restoreState);
}
- final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
- new RocksDBIncrementalSnapshotOperation<>(
- this,
- checkpointStreamFactory,
- checkpointId,
- checkpointTimestamp);
-
- snapshotOperation.takeSnapshot();
-
- return new FutureTask<SnapshotResult<KeyedStateHandle>>(
- () -> snapshotOperation.materializeSnapshot()
- ) {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- snapshotOperation.stop();
- return super.cancel(mayInterruptIfRunning);
- }
+ // clear all meta data
+ kvStateInformation.clear();
+ restoredKvStateMetaInfos.clear();
- @Override
- protected void done() {
-
snapshotOperation.releaseResources(isCancelled());
+ try {
+ if (restoreState == null || restoreState.isEmpty()) {
+ createDB();
+ } else if (restoreState.iterator().next() instanceof
IncrementalKeyedStateHandle) {
+ RocksDBIncrementalRestoreOperation<K>
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+ restoreOperation.restore(restoreState);
+ } else {
+ RocksDBFullRestoreOperation<K> restoreOperation
= new RocksDBFullRestoreOperation<>(this);
+ restoreOperation.doRestore(restoreState);
}
- };
+ } catch (Exception ex) {
+ dispose();
+ throw ex;
+ }
}
- private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
- final long checkpointId,
- final long timestamp,
- final CheckpointStreamFactory streamFactory) throws Exception {
-
- long startTime = System.currentTimeMillis();
- final CloseableRegistry snapshotCloseableRegistry = new
CloseableRegistry();
-
- final RocksDBFullSnapshotOperation<K> snapshotOperation;
-
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot
performed on empty keyed state at " + timestamp +
- " . Returning null.");
- }
+ @Override
+ public void notifyCheckpointComplete(long completedCheckpointId) {
- return DoneFuture.nullValue();
+ if (!enableIncrementalCheckpointing) {
+ return;
}
- snapshotOperation = new RocksDBFullSnapshotOperation<>(this,
streamFactory, snapshotCloseableRegistry);
- snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
-
- // implementation of the async IO operation, based on FutureTask
-
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable
=
- new
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
+ synchronized (materializedSstFiles) {
- @Override
- protected void acquireResources() throws
Exception {
-
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-
snapshotOperation.openCheckpointStream();
- }
+ if (completedCheckpointId < lastCompletedCheckpointId) {
+ return;
+ }
- @Override
- protected void releaseResources() throws
Exception {
- closeLocalRegistry();
- releaseSnapshotOperationResources();
- }
+ materializedSstFiles.keySet().removeIf(checkpointId ->
checkpointId < completedCheckpointId);
- private void
releaseSnapshotOperationResources() {
- // hold the db lock while operation on
the db to guard us against async db disposal
-
snapshotOperation.releaseSnapshotResources();
- }
+ lastCompletedCheckpointId = completedCheckpointId;
+ }
+ }
- @Override
- protected void stopOperation() throws Exception
{
- closeLocalRegistry();
- }
+ private void createDB() throws IOException {
+ List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
+ this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
+ this.defaultColumnFamily = columnFamilyHandles.get(0);
+ }
- private void closeLocalRegistry() {
- if
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
- try {
-
snapshotCloseableRegistry.close();
- } catch (Exception ex) {
- LOG.warn("Error closing
local registry", ex);
- }
- }
- }
+ private RocksDB openDB(
+ String path,
+ List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
+ List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
- @Override
- public SnapshotResult<KeyedStateHandle>
performOperation() throws Exception {
- long startTime =
System.currentTimeMillis();
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+ new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
- if (isStopped()) {
- throw new IOException("RocksDB
closed.");
- }
+ columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
- snapshotOperation.writeDBSnapshot();
+ // we add the required descriptor for the default CF in last
position.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
- LOG.info("Asynchronous RocksDB snapshot
({}, asynchronous part) in thread {} took {} ms.",
- streamFactory,
Thread.currentThread(), (System.currentTimeMillis() - startTime));
+ RocksDB dbRef;
- KeyGroupsStateHandle stateHandle =
snapshotOperation.getSnapshotResultStateHandle();
- return new
SnapshotResult<>(stateHandle, null);
- }
- };
+ try {
+ dbRef = RocksDB.open(
+ Preconditions.checkNotNull(dbOptions),
+ Preconditions.checkNotNull(path),
+ columnFamilyDescriptors,
+ stateColumnFamilyHandles);
+ } catch (RocksDBException e) {
+ throw new IOException("Error while opening RocksDB
instance.", e);
+ }
- LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ",
synchronous part) in thread " +
- Thread.currentThread() + " took " +
(System.currentTimeMillis() - startTime) + " ms.");
+ // requested + default CF
+ Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
+ "Not all requested column family handles have been
created");
- return AsyncStoppableTaskWithCallback.from(ioCallable);
+ return dbRef;
}
/**
- * Encapsulates the process to perform a snapshot of a
RocksDBKeyedStateBackend.
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a full snapshot.
*/
- static final class RocksDBFullSnapshotOperation<K> {
+ private static final class RocksDBFullRestoreOperation<K> {
- static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
- static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
- private final RocksDBKeyedStateBackend<K> stateBackend;
- private final KeyGroupRangeOffsets keyGroupRangeOffsets;
- private final CheckpointStreamFactory checkpointStreamFactory;
- private final CloseableRegistry snapshotCloseableRegistry;
- private final ResourceGuard.Lease dbLease;
-
- private long checkpointId;
- private long checkpointTimeStamp;
-
- private Snapshot snapshot;
- private ReadOptions readOptions;
- private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
-
- private CheckpointStreamFactory.CheckpointStateOutputStream
outStream;
- private DataOutputView outputView;
-
- RocksDBFullSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory,
- CloseableRegistry registry) throws IOException {
+ private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
- this.stateBackend = stateBackend;
- this.checkpointStreamFactory = checkpointStreamFactory;
- this.keyGroupRangeOffsets = new
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
- this.snapshotCloseableRegistry = registry;
- this.dbLease =
this.stateBackend.rocksDBResourceGuard.acquireResource();
- }
+ /** Current key-groups state handle from which we restore
key-groups. */
+ private KeyGroupsStateHandle currentKeyGroupsStateHandle;
+ /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
+ private FSDataInputStream currentStateHandleInStream;
+ /** Current data input view that wraps
currentStateHandleInStream. */
+ private DataInputView currentStateHandleInView;
+ /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
+ private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
+ /** The compression decorator that was used for writing the
state, as determined by the meta data. */
+ private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
/**
- * 1) Create a snapshot object from RocksDB.
+ * Creates a restore operation object for the given state
backend instance.
*
- * @param checkpointId id of the checkpoint for which we take
the snapshot
- * @param checkpointTimeStamp timestamp of the checkpoint for
which we take the snapshot
+ * @param rocksDBKeyedStateBackend the state backend into which
we restore
*/
- public void takeDBSnapShot(long checkpointId, long
checkpointTimeStamp) {
- Preconditions.checkArgument(snapshot == null, "Only one
ongoing snapshot allowed!");
- this.kvStateIterators = new
ArrayList<>(stateBackend.kvStateInformation.size());
- this.checkpointId = checkpointId;
- this.checkpointTimeStamp = checkpointTimeStamp;
- this.snapshot = stateBackend.db.getSnapshot();
+ public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
+ this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
}
/**
- * 2) Open CheckpointStateOutputStream through the
checkpointStreamFactory into which we will write.
+ * Restores all key-groups data that is referenced by the
passed state handles.
*
- * @throws Exception
+ * @param keyedStateHandles List of all key groups state
handles that shall be restored.
*/
- public void openCheckpointStream() throws Exception {
- Preconditions.checkArgument(outStream == null, "Output
stream for snapshot is already set.");
- outStream =
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId,
checkpointTimeStamp);
- snapshotCloseableRegistry.registerCloseable(outStream);
- outputView = new DataOutputViewStreamWrapper(outStream);
- }
+ public void doRestore(Collection<KeyedStateHandle>
keyedStateHandles)
+ throws IOException, StateMigrationException,
RocksDBException {
- /**
- * 3) Write the actual data from RocksDB from the time we took
the snapshot object in (1).
- *
- * @throws IOException
- */
- public void writeDBSnapshot() throws IOException,
InterruptedException {
+ rocksDBKeyedStateBackend.createDB();
- if (null == snapshot) {
- throw new IOException("No snapshot available.
Might be released due to cancellation.");
- }
+ for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
+ if (keyedStateHandle != null) {
- Preconditions.checkNotNull(outStream, "No output stream
to write snapshot.");
- writeKVStateMetaData();
- writeKVStateData();
+ if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
+ throw new
IllegalStateException("Unexpected state handle type, " +
+ "expected: " +
KeyGroupsStateHandle.class +
+ ", but found: " +
keyedStateHandle.getClass());
+ }
+ this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
+ restoreKeyGroupsInStateHandle();
+ }
+ }
}
/**
- * 4) Returns a state handle to the snapshot after the snapshot
procedure is completed and null before.
- *
- * @return state handle to the completed snapshot
+ * Restore one key groups state handle.
*/
- public KeyGroupsStateHandle getSnapshotResultStateHandle()
throws IOException {
-
- if
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
-
- StreamStateHandle stateHandle =
outStream.closeAndGetHandle();
- outStream = null;
-
- if (stateHandle != null) {
- return new
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ private void restoreKeyGroupsInStateHandle()
+ throws IOException, StateMigrationException,
RocksDBException {
+ try {
+ currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
+
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
+ currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
+ restoreKVStateMetaData();
+ restoreKVStateData();
+ } finally {
+ if
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
+
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
- return null;
}
/**
- * 5) Release the snapshot object for RocksDB and clean up.
+ * Restore the KV-state / ColumnFamily meta data for all
key-groups referenced by the current state handle.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws RocksDBException
*/
- public void releaseSnapshotResources() {
+ private void restoreKVStateMetaData() throws IOException,
StateMigrationException, RocksDBException {
- outStream = null;
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
- if (null != kvStateIterators) {
- for (Tuple2<RocksIterator, Integer>
kvStateIterator : kvStateIterators) {
-
IOUtils.closeQuietly(kvStateIterator.f0);
- }
- kvStateIterators = null;
- }
+ serializationProxy.read(currentStateHandleInView);
- if (null != snapshot) {
- if (null != stateBackend.db) {
-
stateBackend.db.releaseSnapshot(snapshot);
- }
- IOUtils.closeQuietly(snapshot);
- snapshot = null;
- }
+ // check for key serializer compatibility; this also
reconfigures the
+ // key serializer to be compatible, if it is required
and is possible
+ if (CompatibilityUtil.resolveCompatibilityResult(
+ serializationProxy.getKeySerializer(),
+ UnloadableDummyTypeSerializer.class,
+
serializationProxy.getKeySerializerConfigSnapshot(),
+ rocksDBKeyedStateBackend.keySerializer)
+ .isRequiresMigration()) {
- if (null != readOptions) {
- IOUtils.closeQuietly(readOptions);
- readOptions = null;
+ // TODO replace with state migration; note that
key hash codes need to remain the same after migration
+ throw new StateMigrationException("The new key
serializer is not compatible to read previous keys. " +
+ "Aborting now since state migration is
currently not available");
}
- this.dbLease.close();
- }
-
- private void writeKVStateMetaData() throws IOException {
+ this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
+ SnappyStreamCompressionDecorator.INSTANCE :
UncompressedStreamCompressionDecorator.INSTANCE;
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> metaInfoSnapshots =
- new
ArrayList<>(stateBackend.kvStateInformation.size());
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> restoredMetaInfos =
+ serializationProxy.getStateMetaInfoSnapshots();
+ currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
+ //rocksDBKeyedStateBackend.restoredKvStateMetaInfos =
new HashMap<>(restoredMetaInfos.size());
- int kvStateId = 0;
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
- stateBackend.kvStateInformation.entrySet()) {
+ for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>
restoredMetaInfo : restoredMetaInfos) {
-
metaInfoSnapshots.add(column.getValue().f1.snapshot());
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
- //retrieve iterator for this k/v states
- readOptions = new ReadOptions();
- readOptions.setSnapshot(snapshot);
+ if (registeredColumn == null) {
+ byte[] nameBytes =
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
- kvStateIterators.add(
- new
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions),
kvStateId));
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+ nameBytes,
+
rocksDBKeyedStateBackend.columnOptions);
- ++kvStateId;
- }
+ RegisteredKeyedBackendStateMetaInfo<?,
?> stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
restoredMetaInfo.getStateType(),
+
restoredMetaInfo.getName(),
+
restoredMetaInfo.getNamespaceSerializer(),
+
restoredMetaInfo.getStateSerializer());
- KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(
- stateBackend.getKeySerializer(),
- metaInfoSnapshots,
-
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE,
stateBackend.keyGroupCompressionDecorator));
+
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
restoredMetaInfo);
- serializationProxy.write(outputView);
- }
+ ColumnFamilyHandle columnFamily =
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
- private void writeKVStateData() throws IOException,
InterruptedException {
+ registeredColumn = new
Tuple2<>(columnFamily, stateMetaInfo);
+
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(),
registeredColumn);
- byte[] previousKey = null;
- byte[] previousValue = null;
- OutputStream kgOutStream = null;
- DataOutputView kgOutView = null;
+ } else {
+ // TODO with eager state registration
in place, check here for serializer migration strategies
+ }
+
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
+ }
+ }
- try {
- // Here we transfer ownership of RocksIterators
to the RocksDBMergeIterator
- try (RocksDBMergeIterator mergeIterator = new
RocksDBMergeIterator(
- kvStateIterators,
stateBackend.keyGroupPrefixBytes)) {
+ /**
+ * Restore the KV-state / ColumnFamily data for all key-groups
referenced by the current state handle.
+ *
+ * @throws IOException
+ * @throws RocksDBException
+ */
+ private void restoreKVStateData() throws IOException,
RocksDBException {
+ //for all key-groups in the current state handle...
+ for (Tuple2<Integer, Long> keyGroupOffset :
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+ int keyGroup = keyGroupOffset.f0;
- // handover complete, null out to
prevent double close
- kvStateIterators = null;
+ // Check that restored key groups all belong to
the backend
+
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
+ "The key group must belong to the
backend");
- //preamble: setup with first key-group
as our lookahead
- if (mergeIterator.isValid()) {
- //begin first key-group by
recording the offset
-
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(),
outStream.getPos());
- //write the k/v-state id as
metadata
- kgOutStream =
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
- kgOutView = new
DataOutputViewStreamWrapper(kgOutStream);
+ long offset = keyGroupOffset.f1;
+ //not empty key-group?
+ if (0L != offset) {
+ currentStateHandleInStream.seek(offset);
+ try (InputStream compressedKgIn =
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
{
+ DataInputViewStreamWrapper
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of
keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(mergeIterator.kvStateId());
- previousKey =
mergeIterator.key();
- previousValue =
mergeIterator.value();
- mergeIterator.next();
+ int kvStateId =
compressedKgInputView.readShort();
+ ColumnFamilyHandle handle =
currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ //insert all k/v pairs into DB
+ boolean keyGroupHasMoreKeys =
true;
+ while (keyGroupHasMoreKeys) {
+ byte[] key =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ byte[] value =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ if
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ //clear the
signal bit in the key to make it ready for insertion again
+
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+
rocksDBKeyedStateBackend.db.put(handle, key, value);
+ //TODO this
could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kvStateId =
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+ &
compressedKgInputView.readShort();
+ if
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+
keyGroupHasMoreKeys = false;
+ } else {
+ handle
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ }
+ } else {
+
rocksDBKeyedStateBackend.db.put(handle, key, value);
+ }
+ }
}
+ }
+ }
+ }
+ }
- //main loop: write k/v pairs ordered by
(key-group, kv-state), thereby tracking key-group offsets.
- while (mergeIterator.isValid()) {
+ /**
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from an incremental snapshot.
+ */
+ private static class RocksDBIncrementalRestoreOperation<T> {
--- End diff --
Would it actually make sense to give these classes their own files? The
`RocksDBKeyedStateBackend` file is 2100 lines long which makes it quite hard to
handle.
---