This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1d648968a1d Fix negative Kafka partition lag caused by inconsistent
current/latest offsets (#18750)
1d648968a1d is described below
commit 1d648968a1d3604958aa26dd51b00e2013add8e2
Author: Guowei Wu <[email protected]>
AuthorDate: Tue Mar 17 21:45:43 2026 +0800
Fix negative Kafka partition lag caused by inconsistent current/latest
offsets (#18750)
* Fix negative Kafka partition lag caused by inconsistent current/latest
offsets
* Address review comments
* Fix stale Javadoc and brittle tests after removing
latestSequenceFromStream
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 116 +++++++++++-------
.../seekablestream/supervisor/OffsetSnapshot.java | 89 ++++++++++++++
.../supervisor/OffsetSnapshotTest.java | 136 +++++++++++++++++++++
3 files changed, 296 insertions(+), 45 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e226db37676..e2f62ed8d75 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -54,6 +54,7 @@ import
org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.OffsetSnapshot;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@@ -71,9 +72,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -95,11 +97,13 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
private static final Long END_OF_PARTITION = Long.MAX_VALUE;
private final Pattern pattern;
- private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;
private final KafkaSupervisorSpec spec;
+ private final AtomicReference<OffsetSnapshot<KafkaTopicPartition, Long>>
offsetSnapshotRef = new AtomicReference<>(
+ OffsetSnapshot.of(Map.of(), Map.of()));
+
public KafkaSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
@@ -126,7 +130,6 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
this.pattern = getIoConfig().isMultiTopic() ?
Pattern.compile(getIoConfig().getStream()) : null;
}
-
@Override
protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
setupRecordSupplier()
{
@@ -173,7 +176,8 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
- Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+ OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
+ Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(offsetSnapshot);
return new KafkaSupervisorReportPayload(
spec.getId(),
spec.getDataSchema().getDataSource(),
@@ -181,7 +185,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
- includeOffsets ? latestSequenceFromStream : null,
+ includeOffsets ? offsetSnapshot.getLatestOffsetsFromStream() : null,
includeOffsets ? partitionLag : null,
includeOffsets ? getPartitionTimeLag() : null,
includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() :
null,
@@ -264,14 +268,16 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
@Override
protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
{
- Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+ OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
offsetSnapshot.getLatestOffsetsFromStream();
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
offsetSnapshot.getHighestIngestedOffsets();
- if (latestSequenceFromStream == null) {
+ if (latestSequencesFromStream.isEmpty()) {
return null;
}
- Set<KafkaTopicPartition> kafkaPartitions =
latestSequenceFromStream.keySet();
- Set<KafkaTopicPartition> taskPartitions = highestCurrentOffsets.keySet();
+ Set<KafkaTopicPartition> kafkaPartitions =
latestSequencesFromStream.keySet();
+ Set<KafkaTopicPartition> taskPartitions = highestIngestedOffsets.keySet();
if (!kafkaPartitions.equals(taskPartitions)) {
try {
log.warn("Mismatched kafka and task partitions: Missing Task
Partitions %s, Missing Kafka Partitions %s",
@@ -284,7 +290,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
}
}
- return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
+ return getRecordLagPerPartitionInLatestSequences(offsetSnapshot);
}
@Nullable
@@ -297,44 +303,48 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(OffsetSnapshot<KafkaTopicPartition,
Long> offsetSnapshot)
{
- if (latestSequenceFromStream == null) {
- return Collections.emptyMap();
- }
-
- return latestSequenceFromStream
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- Entry::getKey,
- e -> e.getValue() != null
- ? e.getValue() -
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
- : 0
- )
- );
+ return offsetSnapshot.getLatestOffsetsFromStream()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue() -
offsetSnapshot.getHighestIngestedOffsets().getOrDefault(e.getKey(), 0L)
+ ));
}
+ /**
+ * Computes the record lag for the specified partitions.
+ * <p>
+ * This method is called by the parent class to calculate lag only for the
partitions present in
+ * {@code currentOffsets}. The values in {@code currentOffsets} are ignored;
only the keys (partitions)
+ * are used to determine which partitions to compute lag for.
+ * <p>
+ * Lag is calculated as {@code latestOffsetFromStream -
highestIngestedOffset} using the current atomic
+ * {@link OffsetSnapshot} to ensure consistency between the two offset maps.
+ *
+ * @param currentOffsets map whose keys indicate the partitions to compute
lag for (values are ignored)
+ * @return map of partition to its record lag, or empty map if no valid data
is available
+ */
@Override
protected Map<KafkaTopicPartition, Long>
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
- if (latestSequenceFromStream == null || currentOffsets == null) {
+ OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
offsetSnapshot.getLatestOffsetsFromStream();
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
offsetSnapshot.getHighestIngestedOffsets();
+
+ if (latestSequencesFromStream.isEmpty() || currentOffsets == null) {
return Collections.emptyMap();
}
- return currentOffsets
- .entrySet()
- .stream()
- .filter(e -> latestSequenceFromStream.get(e.getKey()) != null)
- .collect(
- Collectors.toMap(
- Entry::getKey,
- e -> e.getValue() != null
- ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
- : 0
- )
- );
+ return currentOffsets.keySet().stream()
+ .filter(latestSequencesFromStream::containsKey)
+ .collect(Collectors.toMap(
+ Function.identity(),
+ // compute the lag using offsets from the
snapshot.
+ p -> latestSequencesFromStream.get(p) -
highestIngestedOffsets.getOrDefault(p, 0L)
+ ));
}
@Override
@@ -439,7 +449,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
recordSupplier.seekToLatest(partitions);
- latestSequenceFromStream =
recordSupplier.getLatestSequenceNumbers(partitions);
+ Map<KafkaTopicPartition, Long> latestSequenceFromStream =
recordSupplier.getLatestSequenceNumbers(partitions);
for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
// if there are no messages .getEndOffset would return 0, but if there
are n msgs it would return n+1
@@ -457,6 +467,8 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
)
);
+
+ updateOffsetSnapshot(highestCurrentOffsets, latestSequenceFromStream);
}
catch (InterruptedException e) {
throw new StreamException(e);
@@ -497,9 +509,11 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
}
/**
- * Fetches the latest offsets from the Kafka stream and updates the map
- * {@link #latestSequenceFromStream}. The actual lag is computed lazily in
- * {@link #getPartitionRecordLag}.
+ * Fetches latest stream offsets, combines with the highest ingested offsets
into a snapshot,
+ * and atomically updates {@link #offsetSnapshotRef}.
+ * <p>
+ * Lag is computed consistently from the snapshot in downstream methods.
+ * </p>
*/
@Override
protected void updatePartitionLagFromStream()
@@ -509,6 +523,8 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
return;
}
+ Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
getRecordSupplierLock().lock();
try {
Set<KafkaTopicPartition> partitionIds;
@@ -527,8 +543,10 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
recordSupplier.seekToLatest(partitions);
- latestSequenceFromStream =
+ Map<KafkaTopicPartition, Long> latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId,
recordSupplier::getPosition));
+
+ updateOffsetSnapshot(highestCurrentOffsets, latestSequenceFromStream);
}
catch (InterruptedException e) {
throw new StreamException(e);
@@ -538,10 +556,18 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
}
}
+ private void updateOffsetSnapshot(
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets,
+ Map<KafkaTopicPartition, Long> latestOffsetsFromStream
+ )
+ {
+ offsetSnapshotRef.set(OffsetSnapshot.of(highestIngestedOffsets,
latestOffsetsFromStream));
+ }
+
@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
- return latestSequenceFromStream != null ? latestSequenceFromStream : new
HashMap<>();
+ return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java
new file mode 100644
index 00000000000..ba1df930fbb
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Immutable snapshot containing a consistent pair of offset maps: the highest
ingested offsets
+ * reported by tasks and the latest end offsets fetched from the underlying
stream.
+ *
+ * <p>
+ * The supervisor fetches task-reported ingested offsets first, then fetches
end offsets from the stream.
+ * Because these two values are captured at slightly different instants, the
reported lag
+ * (latestOffsetsFromStream - highestIngestedOffsets) may be slightly larger
than the actual lag at any
+ * precise moment.
+ *
+ * <p>
+ * By publishing both maps together as a single atomic snapshot (using {@link
java.util.concurrent.atomic.AtomicReference}),
+ * readers (such as lag metrics and supervisor status) always observe a
coherent and consistent view.
+ * This produces stable, non-negative lag values over time, avoiding artifacts
like temporary negative lags.
+ *
+ * <p>
+ * This class is generic and can be reused by all seekable-stream supervisors
(Kafka, Kinesis, etc.).
+ */
+public final class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+{
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
highestIngestedOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
latestOffsetsFromStream;
+
+ private OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType>
highestIngestedOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType>
latestOffsetsFromStream
+ )
+ {
+ this.highestIngestedOffsets = toImmutableOffsetMap(highestIngestedOffsets);
+ this.latestOffsetsFromStream =
toImmutableOffsetMap(latestOffsetsFromStream);
+ }
+
+ public static <PartitionIdType, SequenceOffsetType>
OffsetSnapshot<PartitionIdType, SequenceOffsetType> of(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ return new OffsetSnapshot<>(currentOffsets, endOffsets);
+ }
+
+ private ImmutableMap<PartitionIdType, SequenceOffsetType>
toImmutableOffsetMap(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> input
+ )
+ {
+ if (input == null) {
+ return ImmutableMap.of();
+ }
+
+ return ImmutableMap.copyOf(Maps.filterValues(input, Objects::nonNull));
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType>
getHighestIngestedOffsets()
+ {
+ return highestIngestedOffsets;
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType>
getLatestOffsetsFromStream()
+ {
+ return latestOffsetsFromStream;
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java
new file mode 100644
index 00000000000..603d8648ba2
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OffsetSnapshotTest
+{
+ @Test
+ public void testOffsetSnapshot_emptyInputReturnsEmptyMap()
+ {
+ OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of(
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ );
+
+ Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+ Assert.assertEquals(ImmutableMap.of(),
snapshot.getHighestIngestedOffsets());
+ Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+ Assert.assertEquals(ImmutableMap.of(),
snapshot.getLatestOffsetsFromStream());
+ }
+
+ @Test
+ public void testOffsetSnapshot_nullInputsReturnEmptyMaps()
+ {
+ OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null);
+
+ Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+ Assert.assertEquals(ImmutableMap.of(),
snapshot.getHighestIngestedOffsets());
+ Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+ Assert.assertEquals(ImmutableMap.of(),
snapshot.getLatestOffsetsFromStream());
+ }
+
+ @Test
+ public void testOffsetSnapshot_nullCurrentOffsetsReturnsEmptyCurrentMap()
+ {
+ Map<Integer, Long> endOffsets = ImmutableMap.of(0, 100L, 1, 200L);
+
+ OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null,
endOffsets);
+
+ Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+ Assert.assertEquals(ImmutableMap.of(),
snapshot.getHighestIngestedOffsets());
+ Assert.assertEquals(endOffsets, snapshot.getLatestOffsetsFromStream());
+ }
+
+ @Test
+ public void testOffsetSnapshot_nullEndOffsetsReturnsEmptyEndMap()
+ {
+ Map<Integer, Long> currentOffsets = ImmutableMap.of(0, 50L, 1, 150L);
+
+ OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(currentOffsets,
null);
+
+ Assert.assertEquals(currentOffsets, snapshot.getHighestIngestedOffsets());
+ Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+ Assert.assertEquals(ImmutableMap.of(),
snapshot.getLatestOffsetsFromStream());
+ }
+
+ @Test
+ public void testOffsetSnapshot_copiesInputMapsAndReturnsImmutableCopies()
+ {
+ Map<String, String> current = new HashMap<>();
+ current.put("p0", "100");
+ current.put("p1", "200");
+
+ Map<String, String> end = new HashMap<>();
+ end.put("p0", "150");
+ end.put("p2", "300");
+
+ OffsetSnapshot<String, String> snapshot = OffsetSnapshot.of(current, end);
+
+ Assert.assertEquals(Map.of("p0", "100", "p1", "200"),
snapshot.getHighestIngestedOffsets());
+ Assert.assertEquals(Map.of("p0", "150", "p2", "300"),
snapshot.getLatestOffsetsFromStream());
+ }
+
+ @Test
+ public void testOffsetSnapshot_filtersOutNullValues()
+ {
+ Map<Integer, Long> current = new HashMap<>();
+ current.put(0, 100L);
+ current.put(1, null); // should be filtered
+ current.put(2, 200L);
+
+ Map<Integer, Long> end = new HashMap<>();
+ end.put(0, null); // should be filtered
+ end.put(1, 300L);
+
+ OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(current, end);
+
+ Assert.assertEquals(Map.of(0, 100L, 2, 200L),
snapshot.getHighestIngestedOffsets());
+ Assert.assertEquals(2, snapshot.getHighestIngestedOffsets().size());
+
+ Assert.assertEquals(Map.of(1, 300L),
snapshot.getLatestOffsetsFromStream());
+ Assert.assertEquals(1, snapshot.getLatestOffsetsFromStream().size());
+ }
+
+ @Test
+ public void testOffsetSnapshot_snapshotIndependentFromInputMapMutations()
+ {
+ Map<Integer, Long> inputCurrent = new HashMap<>();
+ inputCurrent.put(1, 10L);
+ Map<Integer, Long> inputEnd = new HashMap<>();
+ inputEnd.put(2, 20L);
+
+ OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(inputCurrent,
inputEnd);
+
+ // Mutate inputs after snapshot creation
+ inputCurrent.clear();
+ inputEnd.put(999, 999L);
+
+ Assert.assertEquals(Map.of(1, 10L), snapshot.getHighestIngestedOffsets());
+ Assert.assertEquals(Map.of(2, 20L), snapshot.getLatestOffsetsFromStream());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]