This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8c02880d5f Emit metrics for distribution of number of rows per segment
(#12730)
8c02880d5f is described below
commit 8c02880d5f3e48d3e8189a240eae391eb6107862
Author: TSFenwick <[email protected]>
AuthorDate: Tue Jul 12 07:04:42 2022 -0700
Emit metrics for distribution of number of rows per segment (#12730)
* initial commit of bucket dimensions for metrics
return counts of segments that have rowcount in a bucket size for a
datasource
return average value of rowcount per segment in a datasource
added unit test
naming could use a lot of work
buckets right now are not finalized
added javadocs
altered metrics.md
* fix checkstyle issues
* addressed review comments
add monitor test
move added functionality to new monitor
update docs
* address comments
renamed monitor
handle tombstones better
update docs
added javadocs
* Add support for tombstones in the segment distribution
* undo changes to tombstone segmentizer factory
* fix accidental whitespacing changes
* address comments regarding metrics documentation
and rename variable to be more accurate
* fix tests
* fix checkstyle issues
* fix broken test
* undo removal of timeout
---
docs/configuration/index.md | 1 +
docs/operations/metrics.md | 2 +
.../org/apache/druid/server/SegmentManager.java | 51 ++++-
.../coordination/SegmentLoadDropHandler.java | 11 +
.../metrics/SegmentRowCountDistribution.java | 161 ++++++++++++++
.../druid/server/metrics/SegmentStatsMonitor.java | 105 ++++++++++
.../segment/loading/CacheTestSegmentLoader.java | 102 ++++++++-
.../apache/druid/server/SegmentManagerTest.java | 6 +-
.../server/SegmentManagerThreadSafetyTest.java | 6 +-
.../metrics/SegmentRowCountDistributionTest.java | 233 +++++++++++++++++++++
.../server/metrics/SegmentStatsMonitorTest.java | 214 +++++++++++++++++++
11 files changed, 885 insertions(+), 7 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index a38f79ebf4..32cedc5e7f 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -380,6 +380,7 @@ Metric monitoring is an essential part of Druid operations.
The following monit
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory
statistic as per the memory cgroup.|
|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how
many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics
on Historical processes. Available only on Historical processes.|
+|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL**
Reports statistics about segments on Historical processes. Available only on
Historical processes. Not to be used when lazy loading is configured.
|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many
queries have been successful/failed/interrupted.|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal
metrics of `http` or `parametrized` emitter (see below). Must not be used with
another emitter type. See the description of the metrics here:
https://github.com/apache/druid/pull/4973.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many
ingestion tasks are currently running/pending/waiting and also the number of
successful/failed tasks per emission period.|
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 59fbb9f908..e427532cc7 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -323,6 +323,8 @@ decisions.
|`segment/usedPercent`|Percentage of space used by served
segments.|dataSource, tier, priority.|< 100%|
|`segment/count`|Number of served segments.|dataSource, tier,
priority.|Varies.|
|`segment/pendingDelete`|On-disk size in bytes of segments that are waiting to
be cleared out|Varies.|
+|`segment/rowCount/avg`| The average number of rows per segment on a
historical. `SegmentStatsMonitor` must be enabled.| dataSource, tier,
priority.|Varies. See [segment
optimization](../operations/segment-optimization.md) for guidance on optimal
segment sizes. |
+|`segment/rowCount/range/count`| The number of segments in a bucket.
`SegmentStatsMonitor` must be enabled.| dataSource, tier, priority,
range.|Varies.|
### JVM
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 329fc16248..ff509272a8 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -31,10 +31,12 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -74,17 +76,31 @@ public class SegmentManager
private final ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable>
tablesLookup = new ConcurrentHashMap<>();
private long totalSegmentSize;
private long numSegments;
+ private long rowCount;
+ private final SegmentRowCountDistribution segmentRowCountDistribution =
new SegmentRowCountDistribution();
- private void addSegment(DataSegment segment)
+ private void addSegment(DataSegment segment, long numOfRows)
{
totalSegmentSize += segment.getSize();
numSegments++;
+ rowCount += (numOfRows);
+ if (segment.isTombstone()) {
+ segmentRowCountDistribution.addTombstoneToDistribution();
+ } else {
+ segmentRowCountDistribution.addRowCountToDistribution(numOfRows);
+ }
}
- private void removeSegment(DataSegment segment)
+ private void removeSegment(DataSegment segment, long numOfRows)
{
totalSegmentSize -= segment.getSize();
numSegments--;
+ rowCount -= numOfRows;
+ if (segment.isTombstone()) {
+ segmentRowCountDistribution.removeTombstoneFromDistribution();
+ } else {
+ segmentRowCountDistribution.removeRowCountFromDistribution(numOfRows);
+ }
}
public VersionedIntervalTimeline<String, ReferenceCountingSegment>
getTimeline()
@@ -97,6 +113,11 @@ public class SegmentManager
return tablesLookup;
}
+ public long getAverageRowCount()
+ {
+ return numSegments == 0 ? 0 : rowCount / numSegments;
+ }
+
public long getTotalSegmentSize()
{
return totalSegmentSize;
@@ -111,8 +132,14 @@ public class SegmentManager
{
return numSegments == 0;
}
+
+ private SegmentRowCountDistribution getSegmentRowCountDistribution()
+ {
+ return segmentRowCountDistribution;
+ }
}
+
@Inject
public SegmentManager(
SegmentLoader segmentLoader
@@ -138,6 +165,16 @@ public class SegmentManager
return CollectionUtils.mapValues(dataSources,
SegmentManager.DataSourceState::getTotalSegmentSize);
}
+ public Map<String, Long> getAverageRowCountForDatasource()
+ {
+ return CollectionUtils.mapValues(dataSources,
SegmentManager.DataSourceState::getAverageRowCount);
+ }
+
+ public Map<String, SegmentRowCountDistribution> getRowCountDistribution()
+ {
+ return CollectionUtils.mapValues(dataSources,
SegmentManager.DataSourceState::getSegmentRowCountDistribution);
+ }
+
public Set<String> getDataSourceNames()
{
return dataSources.keySet();
@@ -265,7 +302,9 @@ public class SegmentManager
segment.getVersion(),
segment.getShardSpec().createChunk(adapter)
);
- dataSourceState.addSegment(segment);
+ StorageAdapter storageAdapter = adapter.asStorageAdapter();
+ long numOfRows = (segment.isTombstone() || storageAdapter == null)
? 0 : storageAdapter.getNumRows();
+ dataSourceState.addSegment(segment, numOfRows);
// Asyncly load segment index files into page cache in a thread
pool
segmentLoader.loadSegmentIntoPageCache(segment,
loadSegmentIntoPageCacheExec);
resultSupplier.set(true);
@@ -321,9 +360,13 @@ public class SegmentManager
);
final ReferenceCountingSegment oldQueryable = (removed == null) ?
null : removed.getObject();
+
if (oldQueryable != null) {
try (final Closer closer = Closer.create()) {
- dataSourceState.removeSegment(segment);
+ StorageAdapter storageAdapter =
oldQueryable.asStorageAdapter();
+ long numOfRows = (segment.isTombstone() || storageAdapter ==
null) ? 0 : storageAdapter.getNumRows();
+ dataSourceState.removeSegment(segment, numOfRows);
+
closer.register(oldQueryable);
log.info("Attempting to close segment %s", segment.getId());
final ReferenceCountingIndexedTable oldTable =
dataSourceState.tablesLookup.remove(segment.getId());
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 5d4bf76018..ba918d8fbd 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -45,6 +45,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@@ -306,6 +307,16 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
}
+ public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
+ {
+ return segmentManager.getAverageRowCountForDatasource();
+ }
+
+ public Map<String, SegmentRowCountDistribution>
getRowCountDistributionPerDatasource()
+ {
+ return segmentManager.getRowCountDistribution();
+ }
+
@Override
public void addSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
{
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java
b/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java
new file mode 100644
index 0000000000..337df6384d
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.function.ObjIntConsumer;
+
+/**
+ * Class that creates a count of segments that have row counts in certain
buckets
+ */
+public class SegmentRowCountDistribution
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentRowCountDistribution.class);
+
+ private final int[] buckets = new int[9];
+ private static final int TOMBSTONE_BUCKET_INDEX = 0;
+
+ /**
+ * Increments the count for a particular bucket held in this class
+ *
+ * @param rowCount the number of rows to figure out which bucket to increment
+ */
+ public void addRowCountToDistribution(long rowCount)
+ {
+ int bucketIndex = determineBucketFromRowCount(rowCount);
+ buckets[bucketIndex]++;
+ }
+
+ /**
+ * Decrements the count for a particular bucket held in this class
+ *
+ * @param rowCount the count which determines which bucket to decrement
+ */
+ public void removeRowCountFromDistribution(long rowCount)
+ {
+ int bucketIndex = determineBucketFromRowCount(rowCount);
+ buckets[bucketIndex]--;
+ if (buckets[bucketIndex] < 0) {
+ // can this ever go negative?
+ log.error("somehow got a count of less than 0, resetting value to 0");
+ buckets[bucketIndex] = 0;
+ }
+ }
+
+ /**
+ * Increments the count for number of tombstones in the distribution
+ */
+ public void addTombstoneToDistribution()
+ {
+ buckets[TOMBSTONE_BUCKET_INDEX]++;
+ }
+
+ /**
+ * Decrements the count for the number of tombstones in he distribution.
+ */
+ public void removeTombstoneFromDistribution()
+ {
+ buckets[TOMBSTONE_BUCKET_INDEX]--;
+ }
+
+ /**
+ * Determines the name of the dimension used for a bucket. Should never
return NA as this isn't public and this
+ * method is private to this class
+ *
+ * @param index the index of the bucket
+ * @return the dimension which the bucket index refers to
+ */
+ private static String getBucketDimensionFromIndex(int index)
+ {
+ switch (index) {
+ case 0:
+ return "Tombstone";
+ case 1:
+ return "0";
+ case 2:
+ return "1-10k";
+ case 3:
+ return "10k-2M";
+ case 4:
+ return "2M-4M";
+ case 5:
+ return "4M-6M";
+ case 6:
+ return "6M-8M";
+ case 7:
+ return "8M-10M";
+ case 8:
+ return "10M+";
+ // should never get to default
+ default:
+ return "NA";
+ }
+ }
+
+ /**
+ * Figures out which bucket the specified rowCount belongs to
+ *
+ * @param rowCount the number of rows in a segment
+ * @return the bucket index
+ */
+ private static int determineBucketFromRowCount(long rowCount)
+ {
+ // 0 indexed bucket is reserved for tombstones
+ if (rowCount <= 0L) {
+ return 1;
+ }
+ if (rowCount <= 10_000L) {
+ return 2;
+ }
+ if (rowCount <= 2_000_000L) {
+ return 3;
+ }
+ if (rowCount <= 4_000_000L) {
+ return 4;
+ }
+ if (rowCount <= 6_000_000L) {
+ return 5;
+ }
+ if (rowCount <= 8_000_000L) {
+ return 6;
+ }
+ if (rowCount <= 10_000_000L) {
+ return 7;
+ }
+ return 8;
+ }
+
+ /**
+ * Gives the consumer the range dimension and the associated count. Will not
give zero range unless there is a count there.
+ *
+ * @param consumer
+ */
+ public void forEachDimension(final ObjIntConsumer<String> consumer)
+ {
+ for (int ii = 0; ii < buckets.length; ii++) {
+ // only report tombstones and 0 bucket if it has nonzero value
+ if (ii > 1 || buckets[ii] != 0) {
+ consumer.accept(getBucketDimensionFromIndex(ii), buckets[ii]);
+ }
+ }
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java
new file mode 100644
index 0000000000..c72aba2957
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.server.metrics;
+
+
+import com.google.inject.Inject;
+import org.apache.druid.client.DruidServerConfig;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+
+import java.util.Map;
+
+/**
+ * An experimental monitor used to keep track of segment stats. Can only be
used on a historical and cannot be used with lazy loading.
+ *
+ * It keeps track of the average number of rows in a segment and the
distribution of segments according to rowCount.
+ */
+public class SegmentStatsMonitor extends AbstractMonitor
+{
+ private final DruidServerConfig serverConfig;
+ private final SegmentLoadDropHandler segmentLoadDropHandler;
+
+ private static final Logger log = new Logger(SegmentStatsMonitor.class);
+
+ /**
+ * Constructor for this monitor. Will throw IllegalStateException if lazy
load on start is set to true.
+ *
+ * @param serverConfig
+ * @param segmentLoadDropHandler
+ * @param segmentLoaderConfig
+ */
+ @Inject
+ public SegmentStatsMonitor(
+ DruidServerConfig serverConfig,
+ SegmentLoadDropHandler segmentLoadDropHandler,
+ SegmentLoaderConfig segmentLoaderConfig
+ )
+ {
+ if (segmentLoaderConfig.isLazyLoadOnStart()) {
+ // log message ensures there is an error displayed at startup if this
fails as the exception isn't logged.
+ log.error("Monitor doesn't support working with lazy loading on start");
+ // throw this exception it kill the process at startup
+ throw new IllegalStateException("Monitor doesn't support working with
lazy loading on start");
+ }
+ this.serverConfig = serverConfig;
+ this.segmentLoadDropHandler = segmentLoadDropHandler;
+
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ for (Map.Entry<String, Long> entry :
segmentLoadDropHandler.getAverageNumOfRowsPerSegmentForDatasource().entrySet())
{
+ String dataSource = entry.getKey();
+ long averageSize = entry.getValue();
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .setDimension("tier", serverConfig.getTier())
+ .setDimension("priority",
String.valueOf(serverConfig.getPriority()));
+ emitter.emit(builder.build("segment/rowCount/avg", averageSize));
+ }
+
+ for (Map.Entry<String, SegmentRowCountDistribution> entry :
segmentLoadDropHandler.getRowCountDistributionPerDatasource()
+
.entrySet()) {
+ String dataSource = entry.getKey();
+ SegmentRowCountDistribution rowCountBucket = entry.getValue();
+
+ rowCountBucket.forEachDimension((final String bucketDimension, final int
count) -> {
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .setDimension("tier", serverConfig.getTier())
+ .setDimension("priority",
String.valueOf(serverConfig.getPriority()));
+ builder.setDimension("range", bucketDimension);
+ ServiceEventBuilder<ServiceMetricEvent> output =
builder.build("segment/rowCount/range/count", count);
+ emitter.emit(output);
+ });
+ }
+
+ return true;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 941b459c73..f80688276c 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -19,15 +19,26 @@
package org.apache.druid.segment.loading;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.concurrent.ExecutorService;
/**
@@ -61,7 +72,96 @@ public class CacheTestSegmentLoader implements SegmentLoader
@Override
public StorageAdapter asStorageAdapter()
{
- throw new UnsupportedOperationException();
+ return new StorageAdapter()
+ {
+ @Override
+ public Interval getInterval()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Indexed<String> getAvailableDimensions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable<String> getAvailableMetrics()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getDimensionCardinality(String column)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DateTime getMinTime()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DateTime getMaxTime()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMinValue(String column)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMaxValue(String column)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return 1;
+ }
+
+ @Override
+ public DateTime getMaxIngestedEventTime()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Metadata getMetadata()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Sequence<Cursor> makeCursors(
+ @Nullable Filter filter,
+ Interval interval,
+ VirtualColumns virtualColumns,
+ Granularity gran,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
@Override
diff --git
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index a64078b4d7..9d1ac4c4e2 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -45,6 +45,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
@@ -89,11 +90,14 @@ public class SegmentManagerTest
{
private final String version;
private final Interval interval;
+ private final StorageAdapter storageAdapter;
SegmentForTesting(String version, Interval interval)
{
this.version = version;
this.interval = interval;
+ storageAdapter = Mockito.mock(StorageAdapter.class);
+ Mockito.when(storageAdapter.getNumRows()).thenReturn(1);
}
public String getVersion()
@@ -127,7 +131,7 @@ public class SegmentManagerTest
@Override
public StorageAdapter asStorageAdapter()
{
- throw new UnsupportedOperationException();
+ return storageAdapter;
}
@Override
diff --git
a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index b1de3a22ee..27abe09161 100644
---
a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++
b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -56,6 +56,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.io.File;
@@ -227,6 +228,8 @@ public class SegmentManagerThreadSafetyTest
{
return new Segment()
{
+ StorageAdapter storageAdapter = Mockito.mock(StorageAdapter.class);
+
@Override
public SegmentId getId()
{
@@ -249,7 +252,8 @@ public class SegmentManagerThreadSafetyTest
@Override
public StorageAdapter asStorageAdapter()
{
- throw new UnsupportedOperationException();
+ Mockito.when(storageAdapter.getNumRows()).thenReturn(1);
+ return storageAdapter;
}
@Override
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java
b/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java
new file mode 100644
index 0000000000..b5ebfe0174
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.function.ObjIntConsumer;
+
+
+public class SegmentRowCountDistributionTest
+{
+
+ private SegmentRowCountDistribution rowCountBucket;
+
+ @Before
+ public void setUp()
+ {
+ rowCountBucket = new SegmentRowCountDistribution();
+ }
+
+ @Test
+ public void test_bucketCountSanity()
+ {
+ // test base case
+ rowCountBucket.forEachDimension((final String dimension, final int count)
-> {
+ Assert.assertEquals(0, count);
+ });
+
+ // tombstones
+ // add tombstones
+ rowCountBucket.addTombstoneToDistribution();
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0));
+ rowCountBucket.addTombstoneToDistribution();
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 0));
+ // remove tombstones
+ rowCountBucket.removeTombstoneFromDistribution();
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0));
+ rowCountBucket.removeTombstoneFromDistribution();
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 0));
+
+ // test bounds of 1st bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(0);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1));
+ rowCountBucket.addRowCountToDistribution(0);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 1));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(0);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1));
+ rowCountBucket.removeRowCountFromDistribution(0);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 1));
+
+ // test bounds of 2nd bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(1);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2));
+ rowCountBucket.addRowCountToDistribution(10_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 2));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(1);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2));
+ rowCountBucket.removeRowCountFromDistribution(10_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 2));
+
+ // test bounds of 3rd bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(10_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3));
+ rowCountBucket.addRowCountToDistribution(2_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 3));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(10_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3));
+ rowCountBucket.removeRowCountFromDistribution(2_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 3));
+
+ // test bounds of 4th bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(2_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4));
+ rowCountBucket.addRowCountToDistribution(4_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 4));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(2_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4));
+
+ rowCountBucket.removeRowCountFromDistribution(4_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 4));
+
+
+ // test bounds of 5th bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(4_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5));
+ rowCountBucket.addRowCountToDistribution(6_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 5));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(4_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5));
+ rowCountBucket.removeRowCountFromDistribution(6_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 5));
+
+ // test bounds of 6th bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(6_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6));
+ rowCountBucket.addRowCountToDistribution(8_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 6));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(6_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6));
+ rowCountBucket.removeRowCountFromDistribution(8_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 6));
+
+ // test bounds of 7th bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(8_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7));
+ rowCountBucket.addRowCountToDistribution(10_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 7));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(8_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7));
+ rowCountBucket.removeRowCountFromDistribution(10_000_000);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 7));
+
+ // test bounds of 8th bucket
+ // with addition
+ rowCountBucket.addRowCountToDistribution(10_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8));
+ rowCountBucket.addRowCountToDistribution(Long.MAX_VALUE);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 8));
+ // with removal
+ rowCountBucket.removeRowCountFromDistribution(10_000_001);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8));
+ rowCountBucket.removeRowCountFromDistribution(Long.MAX_VALUE);
+ rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 8));
+ }
+
+ // this is used to test part of the functionality in
AssertBucketHasValue.assertExpected
+ @Test
+ public void test_bucketDimensionFromIndex()
+ {
+ Assert.assertEquals("Tombstone", getBucketDimensionFromIndex(0));
+ Assert.assertEquals("0", getBucketDimensionFromIndex(1));
+ Assert.assertEquals("1-10k", getBucketDimensionFromIndex(2));
+ Assert.assertEquals("10k-2M", getBucketDimensionFromIndex(3));
+ Assert.assertEquals("2M-4M", getBucketDimensionFromIndex(4));
+ Assert.assertEquals("4M-6M", getBucketDimensionFromIndex(5));
+ Assert.assertEquals("6M-8M", getBucketDimensionFromIndex(6));
+ Assert.assertEquals("8M-10M", getBucketDimensionFromIndex(7));
+ Assert.assertEquals("10M+", getBucketDimensionFromIndex(8));
+ Assert.assertEquals("NA", getBucketDimensionFromIndex(9));
+ }
+
+ private static class AssertBucketHasValue implements ObjIntConsumer<String>
+ {
+
+ private final int expectedBucket;
+ private final int expectedValue;
+
+ private AssertBucketHasValue(int expectedBucket, int expectedValue)
+ {
+ this.expectedBucket = expectedBucket;
+ this.expectedValue = expectedValue;
+ }
+
+ static AssertBucketHasValue assertExpected(int expectedValue, int
expectedBucket)
+ {
+ return new AssertBucketHasValue(expectedBucket, expectedValue);
+ }
+
+ @Override
+ public void accept(String s, int value)
+ {
+ if (s.equals(getBucketDimensionFromIndex(expectedBucket))) {
+ Assert.assertEquals(expectedValue, value);
+ } else {
+ // assert all other values are empty
+ Assert.assertEquals(0, value);
+ }
+ }
+ }
+
+ // this is here because we didn't want to expose the internals of the
buckets for segment rowCount distributions
+ private static String getBucketDimensionFromIndex(int index)
+ {
+ switch (index) {
+ case 0:
+ return "Tombstone";
+ case 1:
+ return "0";
+ case 2:
+ return "1-10k";
+ case 3:
+ return "10k-2M";
+ case 4:
+ return "2M-4M";
+ case 5:
+ return "4M-6M";
+ case 6:
+ return "6M-8M";
+ case 7:
+ return "8M-10M";
+ case 8:
+ return "10M+";
+ // should never get to default
+ default:
+ return "NA";
+ }
+ }
+
+
+}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java
new file mode 100644
index 0000000000..cb08a39fc3
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.DruidServerConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class SegmentStatsMonitorTest
+{
+ private static final String DATA_SOURCE = "dataSource";
+ private static final int PRIORITY = 111;
+ private static final String TIER = "tier";
+
+ private DruidServerConfig druidServerConfig;
+ private SegmentLoadDropHandler segmentLoadDropMgr;
+ private ServiceEmitter serviceEmitter;
+ private SegmentStatsMonitor monitor;
+ private final SegmentLoaderConfig segmentLoaderConfig = new
SegmentLoaderConfig();
+
+ @Before
+ public void setUp()
+ {
+ druidServerConfig = Mockito.mock(DruidServerConfig.class);
+ segmentLoadDropMgr = Mockito.mock(SegmentLoadDropHandler.class);
+ serviceEmitter = Mockito.mock(ServiceEmitter.class);
+ monitor = new SegmentStatsMonitor(
+ druidServerConfig,
+ segmentLoadDropMgr,
+ segmentLoaderConfig
+ );
+ Mockito.when(druidServerConfig.getTier()).thenReturn(TIER);
+ Mockito.when(druidServerConfig.getPriority()).thenReturn(PRIORITY);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testLazyLoadOnStartThrowsException()
+ {
+ SegmentLoaderConfig segmentLoaderConfig =
Mockito.mock(SegmentLoaderConfig.class);
+ Mockito.when(segmentLoaderConfig.isLazyLoadOnStart()).thenReturn(true);
+
+ //should throw an exception here
+ new SegmentStatsMonitor(druidServerConfig, segmentLoadDropMgr,
segmentLoaderConfig);
+ }
+
+ @Test
+ public void testSimple()
+ {
+ final SegmentRowCountDistribution segmentRowCountDistribution = new
SegmentRowCountDistribution();
+ segmentRowCountDistribution.addRowCountToDistribution(100_000L);
+
+
Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource())
+ .thenReturn(ImmutableMap.of(DATA_SOURCE, 100_000L));
+ Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource())
+ .thenReturn(ImmutableMap.of(DATA_SOURCE,
segmentRowCountDistribution));
+
+ ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>>
eventArgumentCaptor = ArgumentCaptor.forClass(
+ ServiceEventBuilder.class);
+ monitor.doMonitor(serviceEmitter);
+ Mockito.verify(serviceEmitter,
Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture());
+
+ List<Map<String, Object>> eventsAsMaps =
getEventMaps(eventArgumentCaptor.getAllValues());
+ Map<String, Map<String, Object>> actual = metricKeyedMap(eventsAsMaps);
+
+ List<ServiceEventBuilder<ServiceMetricEvent>> expectedEvents = new
ArrayList<>();
+ expectedEvents.add(averageRowCountEvent(100_000L));
+ expectedEvents.add(rowCountRangeEvent("1-10k", 0));
+ expectedEvents.add(rowCountRangeEvent("10k-2M", 1));
+ expectedEvents.add(rowCountRangeEvent("2M-4M", 0));
+ expectedEvents.add(rowCountRangeEvent("4M-6M", 0));
+ expectedEvents.add(rowCountRangeEvent("6M-8M", 0));
+ expectedEvents.add(rowCountRangeEvent("8M-10M", 0));
+ expectedEvents.add(rowCountRangeEvent("10M+", 0));
+
+ List<Map<String, Object>> expectedEventsAsMap =
getEventMaps(expectedEvents);
+ Map<String, Map<String, Object>> expected =
metricKeyedMap(expectedEventsAsMap);
+
+ Assert.assertEquals("different number of metrics were returned",
expected.size(), actual.size());
+ for (Map.Entry<String, Map<String, Object>> expectedKeyedEntry :
expected.entrySet()) {
+ Map<String, Object> actualValue =
actual.get(expectedKeyedEntry.getKey());
+ assertMetricMapsEqual(expectedKeyedEntry.getKey(),
expectedKeyedEntry.getValue(), actualValue);
+ }
+ }
+
+ @Test
+ public void testZeroAndTombstoneDistribution()
+ {
+ final SegmentRowCountDistribution segmentRowCountDistribution = new
SegmentRowCountDistribution();
+ segmentRowCountDistribution.addRowCountToDistribution(100_000L);
+ segmentRowCountDistribution.addRowCountToDistribution(0L);
+ segmentRowCountDistribution.addTombstoneToDistribution();
+ segmentRowCountDistribution.addTombstoneToDistribution();
+
+
Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource())
+ .thenReturn(ImmutableMap.of(DATA_SOURCE, 50_000L));
+ Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource())
+ .thenReturn(ImmutableMap.of(DATA_SOURCE,
segmentRowCountDistribution));
+
+ ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>>
eventArgumentCaptor = ArgumentCaptor.forClass(
+ ServiceEventBuilder.class);
+ monitor.doMonitor(serviceEmitter);
+ Mockito.verify(serviceEmitter,
Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture());
+
+ List<Map<String, Object>> eventsAsMaps =
getEventMaps(eventArgumentCaptor.getAllValues());
+ Map<String, Map<String, Object>> actual = metricKeyedMap(eventsAsMaps);
+
+ List<ServiceEventBuilder<ServiceMetricEvent>> expectedEvents = new
ArrayList<>();
+ expectedEvents.add(averageRowCountEvent(50_000L));
+ expectedEvents.add(rowCountRangeEvent("0", 1));
+ expectedEvents.add(rowCountRangeEvent("Tombstone", 2));
+ expectedEvents.add(rowCountRangeEvent("1-10k", 0));
+ expectedEvents.add(rowCountRangeEvent("10k-2M", 1));
+ expectedEvents.add(rowCountRangeEvent("2M-4M", 0));
+ expectedEvents.add(rowCountRangeEvent("4M-6M", 0));
+ expectedEvents.add(rowCountRangeEvent("6M-8M", 0));
+ expectedEvents.add(rowCountRangeEvent("8M-10M", 0));
+ expectedEvents.add(rowCountRangeEvent("10M+", 0));
+
+ List<Map<String, Object>> expectedEventsAsMap =
getEventMaps(expectedEvents);
+ Map<String, Map<String, Object>> expected =
metricKeyedMap(expectedEventsAsMap);
+
+ Assert.assertEquals("different number of metrics were returned",
expected.size(), actual.size());
+ for (Map.Entry<String, Map<String, Object>> expectedKeyedEntry :
expected.entrySet()) {
+ Map<String, Object> actualValue =
actual.get(expectedKeyedEntry.getKey());
+ assertMetricMapsEqual(expectedKeyedEntry.getKey(),
expectedKeyedEntry.getValue(), actualValue);
+ }
+ }
+
+ private void assertMetricMapsEqual(String messagePrefix, Map<String, Object>
expected, Map<String, Object> actual)
+ {
+ Assert.assertEquals("different number of expected values for metrics",
expected.size(), actual.size());
+ for (Map.Entry<String, Object> expectedMetricEntry : expected.entrySet()) {
+ Assert.assertEquals(
+ messagePrefix + " " + expectedMetricEntry.getKey(),
+ expectedMetricEntry.getValue(),
+ actual.get(expectedMetricEntry.getKey())
+ );
+ }
+ }
+
+ @Nonnull
+ private List<Map<String, Object>>
getEventMaps(List<ServiceEventBuilder<ServiceMetricEvent>> eventBuilders)
+ {
+ return eventBuilders.stream()
+ .map(eventBuilder -> new
HashMap<>(eventBuilder.build(ImmutableMap.of()).toMap()))
+ .peek(mappedValues -> mappedValues.remove("timestamp"))
+ .collect(Collectors.toList());
+ }
+
+ private Map<String, Map<String, Object>> metricKeyedMap(List<Map<String,
Object>> eventsAsMaps)
+ {
+ return eventsAsMaps.stream()
+ .collect(
+ Collectors.toMap(eventasdf -> {
+ String metricName =
eventasdf.get("metric").toString();
+ String range = eventasdf.getOrDefault("range",
"").toString();
+ return metricName + range;
+ }, Function.identity())
+ );
+ }
+
+ private ServiceEventBuilder<ServiceMetricEvent> averageRowCountEvent(Number
value)
+ {
+ return new
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE)
+ .setDimension("tier", TIER)
+ .setDimension("priority",
String.valueOf(PRIORITY))
+ .build("segment/rowCount/avg",
value);
+ }
+
+ private ServiceEventBuilder<ServiceMetricEvent> rowCountRangeEvent(String
range, Number value)
+ {
+ return new
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE)
+ .setDimension("tier", TIER)
+ .setDimension("priority",
String.valueOf(PRIORITY))
+ .setDimension("range", range)
+
.build("segment/rowCount/range/count", value);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]