swaminathanmanish commented on code in PR #12806:
URL: https://github.com/apache/pinot/pull/12806#discussion_r1556650280
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -69,113 +60,56 @@ public KinesisConsumer(KinesisConfig config, KinesisClient
kinesisClient) {
super(config, kinesisClient);
}
- /**
- * Fetch records from the Kinesis stream between the start and end
KinesisCheckpoint
- */
@Override
- public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ public synchronized KinesisMessageBatch
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset)
startMsgOffset;
- List<BytesStreamMessage> messages = new ArrayList<>();
- Future<KinesisMessageBatch> kinesisFetchResultFuture =
- _executorService.submit(() -> getResult(startOffset, messages));
- try {
- return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- kinesisFetchResultFuture.cancel(true);
- } catch (Exception e) {
- // Ignored
- }
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
-
- private KinesisMessageBatch getResult(KinesisPartitionGroupOffset
startOffset, List<BytesStreamMessage> messages) {
- try {
- String shardId = startOffset.getShardId();
- String shardIterator = getShardIterator(shardId,
startOffset.getSequenceNumber());
- boolean endOfShard = false;
- long currentWindow = System.currentTimeMillis() /
SLEEP_TIME_BETWEEN_REQUESTS;
- int currentWindowRequests = 0;
- while (shardIterator != null) {
- GetRecordsRequest getRecordsRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).build();
- long requestSentTime = System.currentTimeMillis() / 1000;
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordsRequest);
- List<Record> records = getRecordsResponse.records();
- if (!records.isEmpty()) {
- for (Record record : records) {
- messages.add(extractStreamMessage(record, shardId));
- }
- if (messages.size() >= _config.getNumMaxRecordsToFetch()) {
- break;
- }
+ String shardId = startOffset.getShardId();
+ String startSequenceNumber = startOffset.getSequenceNumber();
+
+ // NOTE: Kinesis enforces a limit of 5 getRecords request per second on
each shard from AWS end, beyond which we
+ // start getting ProvisionedThroughputExceededException. Rate limit
the requests to avoid this.
+ long currentTimeMs = System.currentTimeMillis();
+ int currentTimeSeconds = (int)
TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
+ if (currentTimeSeconds == _currentSecond) {
+ if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
+ try {
+ Thread.sleep(1000 - (currentTimeMs % 1000));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
-
- if (getRecordsResponse.hasChildShards() &&
!getRecordsResponse.childShards().isEmpty()) {
- //This statement returns true only when end of current shard has
reached.
- // hasChildShards only checks if the childShard is null and is a
valid instance.
- endOfShard = true;
- break;
- }
-
- shardIterator = getRecordsResponse.nextShardIterator();
-
- if (Thread.interrupted()) {
- break;
- }
-
- // Kinesis enforces a limit of 5 .getRecords request per second on
each shard from AWS end
- // Beyond this limit we start getting
ProvisionedThroughputExceededException which affect the ingestion
- if (requestSentTime == currentWindow) {
- currentWindowRequests++;
- } else if (requestSentTime > currentWindow) {
- currentWindow = requestSentTime;
- currentWindowRequests = 0;
- }
-
- if (currentWindowRequests >= _config.getNumMaxRecordsToFetch()) {
- try {
- Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS);
- } catch (InterruptedException e) {
- LOGGER.debug("Sleep interrupted while rate limiting Kinesis
requests", e);
- break;
- }
- }
- }
-
- return buildKinesisMessageBatch(startOffset, messages, endOfShard);
- } catch (IllegalStateException e) {
- debugOrLogWarning("Illegal state exception, connection is broken", e);
- } catch (ProvisionedThroughputExceededException e) {
- debugOrLogWarning("The request rate for the stream is too high", e);
- } catch (ExpiredIteratorException e) {
- debugOrLogWarning("ShardIterator expired while trying to fetch records",
e);
- } catch (ResourceNotFoundException | InvalidArgumentException e) {
- // aws errors
- LOGGER.error("Encountered AWS error while attempting to fetch records",
e);
- } catch (KinesisException e) {
- debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e);
- throw new RuntimeException(e);
- } catch (AbortedException e) {
- if (!(e.getCause() instanceof InterruptedException)) {
- debugOrLogWarning("Task aborted due to exception", e);
+ _currentSecond++;
+ _numRequestsInCurrentSecond = 1;
+ } else {
+ _numRequestsInCurrentSecond++;
}
- } catch (Throwable e) {
- // non transient errors
- LOGGER.error("Unknown fetchRecords exception", e);
- throw new RuntimeException(e);
+ } else {
+ _currentSecond = currentTimeSeconds;
+ _numRequestsInCurrentSecond = 1;
}
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
- private void debugOrLogWarning(String message, Throwable throwable) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(message, throwable);
+ // Get the shard iterator
+ String shardIterator;
+ if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
+ shardIterator = _nextShardIterator;
} else {
- LOGGER.warn(message + ": " + throwable.getMessage());
+ // TODO: Revisit this logic to see if we always miss the first message
when consuming from a new shard
+ GetShardIteratorRequest getShardIteratorRequest =
+
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId)
+
.startingSequenceNumber(startSequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ .build();
+ shardIterator =
_kinesisClient.getShardIterator(getShardIteratorRequest).shardIterator();
+ }
+ if (shardIterator == null) {
+ return new KinesisMessageBatch(List.of(), startOffset, true);
}
- }
- private KinesisMessageBatch
buildKinesisMessageBatch(KinesisPartitionGroupOffset startOffset,
- List<BytesStreamMessage> messages, boolean endOfShard) {
+ // Read records
+ GetRecordsRequest getRecordRequest =
+
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
+ GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
Review Comment:
Good point. We can also a metric to track this when it happens.
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -69,134 +61,77 @@ public KinesisConsumer(KinesisConfig config, KinesisClient
kinesisClient) {
super(config, kinesisClient);
}
- /**
- * Fetch records from the Kinesis stream between the start and end
KinesisCheckpoint
- */
@Override
- public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ public synchronized KinesisMessageBatch
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset)
startMsgOffset;
- List<BytesStreamMessage> messages = new ArrayList<>();
- Future<KinesisMessageBatch> kinesisFetchResultFuture =
- _executorService.submit(() -> getResult(startOffset, messages));
- try {
- return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- kinesisFetchResultFuture.cancel(true);
- } catch (Exception e) {
- // Ignored
- }
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
-
- private KinesisMessageBatch getResult(KinesisPartitionGroupOffset
startOffset, List<BytesStreamMessage> messages) {
- try {
- String shardId = startOffset.getShardId();
- String shardIterator = getShardIterator(shardId,
startOffset.getSequenceNumber());
- boolean endOfShard = false;
- long currentWindow = System.currentTimeMillis() /
SLEEP_TIME_BETWEEN_REQUESTS;
- int currentWindowRequests = 0;
- while (shardIterator != null) {
- GetRecordsRequest getRecordsRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).build();
- long requestSentTime = System.currentTimeMillis() / 1000;
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordsRequest);
- List<Record> records = getRecordsResponse.records();
- if (!records.isEmpty()) {
- for (Record record : records) {
- messages.add(extractStreamMessage(record, shardId));
- }
- if (messages.size() >= _config.getNumMaxRecordsToFetch()) {
- break;
- }
- }
-
- if (getRecordsResponse.hasChildShards() &&
!getRecordsResponse.childShards().isEmpty()) {
- //This statement returns true only when end of current shard has
reached.
- // hasChildShards only checks if the childShard is null and is a
valid instance.
- endOfShard = true;
- break;
- }
-
- shardIterator = getRecordsResponse.nextShardIterator();
-
- if (Thread.interrupted()) {
- break;
- }
-
- // Kinesis enforces a limit of 5 .getRecords request per second on
each shard from AWS end
- // Beyond this limit we start getting
ProvisionedThroughputExceededException which affect the ingestion
- if (requestSentTime == currentWindow) {
- currentWindowRequests++;
- } else if (requestSentTime > currentWindow) {
- currentWindow = requestSentTime;
- currentWindowRequests = 0;
- }
-
- if (currentWindowRequests >= _config.getNumMaxRecordsToFetch()) {
- try {
- Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS);
- } catch (InterruptedException e) {
- LOGGER.debug("Sleep interrupted while rate limiting Kinesis
requests", e);
- break;
- }
- }
- }
-
- return buildKinesisMessageBatch(startOffset, messages, endOfShard);
- } catch (IllegalStateException e) {
- debugOrLogWarning("Illegal state exception, connection is broken", e);
- } catch (ProvisionedThroughputExceededException e) {
- debugOrLogWarning("The request rate for the stream is too high", e);
- } catch (ExpiredIteratorException e) {
- debugOrLogWarning("ShardIterator expired while trying to fetch records",
e);
- } catch (ResourceNotFoundException | InvalidArgumentException e) {
- // aws errors
- LOGGER.error("Encountered AWS error while attempting to fetch records",
e);
- } catch (KinesisException e) {
- debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e);
- throw new RuntimeException(e);
- } catch (AbortedException e) {
- if (!(e.getCause() instanceof InterruptedException)) {
- debugOrLogWarning("Task aborted due to exception", e);
- }
- } catch (Throwable e) {
- // non transient errors
- LOGGER.error("Unknown fetchRecords exception", e);
- throw new RuntimeException(e);
- }
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
+ String shardId = startOffset.getShardId();
+ String startSequenceNumber = startOffset.getSequenceNumber();
- private void debugOrLogWarning(String message, Throwable throwable) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(message, throwable);
+ // Get the shard iterator
+ String shardIterator;
+ if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
+ shardIterator = _nextShardIterator;
} else {
- LOGGER.warn(message + ": " + throwable.getMessage());
+ // TODO: Revisit the offset handling logic. Reading after the start
sequence number can lose the first message
+ // when consuming from a new partition because the initial start
sequence number is inclusive.
+ GetShardIteratorRequest getShardIteratorRequest =
+
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId)
+
.startingSequenceNumber(startSequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ .build();
+ shardIterator =
_kinesisClient.getShardIterator(getShardIteratorRequest).shardIterator();
+ }
+ if (shardIterator == null) {
+ return new KinesisMessageBatch(List.of(), startOffset, true);
}
- }
- private KinesisMessageBatch
buildKinesisMessageBatch(KinesisPartitionGroupOffset startOffset,
- List<BytesStreamMessage> messages, boolean endOfShard) {
+ // Read records
+ rateLimitRequests();
+ GetRecordsRequest getRecordRequest =
+
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
+ GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
+ List<Record> records = getRecordsResponse.records();
+ List<BytesStreamMessage> messages;
KinesisPartitionGroupOffset offsetOfNextBatch;
- if (messages.isEmpty()) {
- offsetOfNextBatch = startOffset;
- } else {
+ if (!records.isEmpty()) {
+ messages = records.stream().map(record -> extractStreamMessage(record,
shardId)).collect(Collectors.toList());
StreamMessageMetadata lastMessageMetadata = messages.get(messages.size()
- 1).getMetadata();
assert lastMessageMetadata != null;
offsetOfNextBatch = (KinesisPartitionGroupOffset)
lastMessageMetadata.getNextOffset();
+ } else {
+ // TODO: Revisit whether Kinesis can return empty batch when there are
available records. The consumer cna handle
+ // empty message batch, but it will treat it as fully caught up.
+ messages = List.of();
+ offsetOfNextBatch = startOffset;
}
- return new KinesisMessageBatch(messages, offsetOfNextBatch, endOfShard);
+ assert offsetOfNextBatch != null;
+ _nextStartSequenceNumber = offsetOfNextBatch.getSequenceNumber();
+ _nextShardIterator = getRecordsResponse.nextShardIterator();
+ return new KinesisMessageBatch(messages, offsetOfNextBatch,
_nextShardIterator == null);
}
- private String getShardIterator(String shardId, String sequenceNumber) {
- GetShardIteratorRequest.Builder requestBuilder =
-
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId);
- if (sequenceNumber != null) {
- requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber)
- .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ /**
+ * Kinesis enforces a limit of 5 getRecords request per second on each shard
from AWS end, beyond which we start
+ * getting {@link ProvisionedThroughputExceededException}. Rate limit the
requests to avoid this.
+ */
+ private void rateLimitRequests() {
Review Comment:
Thanks for creating a separate method. I guess this being a special kind of
rate limiter that needs to block until we are ready to fetch again, we cannot
leverage off the shelf ones like guava.
if kinesis has a limit, don't we need to adhere to that limit. So does
getRpsLimit() need to be what Kinesis limit is ?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -69,134 +61,77 @@ public KinesisConsumer(KinesisConfig config, KinesisClient
kinesisClient) {
super(config, kinesisClient);
}
- /**
- * Fetch records from the Kinesis stream between the start and end
KinesisCheckpoint
- */
@Override
- public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ public synchronized KinesisMessageBatch
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset)
startMsgOffset;
- List<BytesStreamMessage> messages = new ArrayList<>();
- Future<KinesisMessageBatch> kinesisFetchResultFuture =
- _executorService.submit(() -> getResult(startOffset, messages));
- try {
- return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- kinesisFetchResultFuture.cancel(true);
- } catch (Exception e) {
- // Ignored
- }
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
-
- private KinesisMessageBatch getResult(KinesisPartitionGroupOffset
startOffset, List<BytesStreamMessage> messages) {
- try {
- String shardId = startOffset.getShardId();
- String shardIterator = getShardIterator(shardId,
startOffset.getSequenceNumber());
- boolean endOfShard = false;
- long currentWindow = System.currentTimeMillis() /
SLEEP_TIME_BETWEEN_REQUESTS;
- int currentWindowRequests = 0;
- while (shardIterator != null) {
- GetRecordsRequest getRecordsRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).build();
- long requestSentTime = System.currentTimeMillis() / 1000;
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordsRequest);
- List<Record> records = getRecordsResponse.records();
- if (!records.isEmpty()) {
- for (Record record : records) {
- messages.add(extractStreamMessage(record, shardId));
- }
- if (messages.size() >= _config.getNumMaxRecordsToFetch()) {
- break;
- }
- }
-
- if (getRecordsResponse.hasChildShards() &&
!getRecordsResponse.childShards().isEmpty()) {
- //This statement returns true only when end of current shard has
reached.
- // hasChildShards only checks if the childShard is null and is a
valid instance.
- endOfShard = true;
- break;
- }
-
- shardIterator = getRecordsResponse.nextShardIterator();
-
- if (Thread.interrupted()) {
- break;
- }
-
- // Kinesis enforces a limit of 5 .getRecords request per second on
each shard from AWS end
- // Beyond this limit we start getting
ProvisionedThroughputExceededException which affect the ingestion
- if (requestSentTime == currentWindow) {
- currentWindowRequests++;
- } else if (requestSentTime > currentWindow) {
- currentWindow = requestSentTime;
- currentWindowRequests = 0;
- }
-
- if (currentWindowRequests >= _config.getNumMaxRecordsToFetch()) {
- try {
- Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS);
- } catch (InterruptedException e) {
- LOGGER.debug("Sleep interrupted while rate limiting Kinesis
requests", e);
- break;
- }
- }
- }
-
- return buildKinesisMessageBatch(startOffset, messages, endOfShard);
- } catch (IllegalStateException e) {
- debugOrLogWarning("Illegal state exception, connection is broken", e);
- } catch (ProvisionedThroughputExceededException e) {
- debugOrLogWarning("The request rate for the stream is too high", e);
- } catch (ExpiredIteratorException e) {
- debugOrLogWarning("ShardIterator expired while trying to fetch records",
e);
- } catch (ResourceNotFoundException | InvalidArgumentException e) {
- // aws errors
- LOGGER.error("Encountered AWS error while attempting to fetch records",
e);
- } catch (KinesisException e) {
- debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e);
- throw new RuntimeException(e);
- } catch (AbortedException e) {
- if (!(e.getCause() instanceof InterruptedException)) {
- debugOrLogWarning("Task aborted due to exception", e);
- }
- } catch (Throwable e) {
- // non transient errors
- LOGGER.error("Unknown fetchRecords exception", e);
- throw new RuntimeException(e);
- }
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
+ String shardId = startOffset.getShardId();
+ String startSequenceNumber = startOffset.getSequenceNumber();
- private void debugOrLogWarning(String message, Throwable throwable) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(message, throwable);
+ // Get the shard iterator
+ String shardIterator;
+ if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
+ shardIterator = _nextShardIterator;
} else {
- LOGGER.warn(message + ": " + throwable.getMessage());
+ // TODO: Revisit the offset handling logic. Reading after the start
sequence number can lose the first message
+ // when consuming from a new partition because the initial start
sequence number is inclusive.
+ GetShardIteratorRequest getShardIteratorRequest =
+
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId)
+
.startingSequenceNumber(startSequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ .build();
+ shardIterator =
_kinesisClient.getShardIterator(getShardIteratorRequest).shardIterator();
+ }
+ if (shardIterator == null) {
+ return new KinesisMessageBatch(List.of(), startOffset, true);
}
- }
- private KinesisMessageBatch
buildKinesisMessageBatch(KinesisPartitionGroupOffset startOffset,
- List<BytesStreamMessage> messages, boolean endOfShard) {
+ // Read records
+ rateLimitRequests();
+ GetRecordsRequest getRecordRequest =
+
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
+ GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
+ List<Record> records = getRecordsResponse.records();
+ List<BytesStreamMessage> messages;
KinesisPartitionGroupOffset offsetOfNextBatch;
- if (messages.isEmpty()) {
- offsetOfNextBatch = startOffset;
- } else {
+ if (!records.isEmpty()) {
+ messages = records.stream().map(record -> extractStreamMessage(record,
shardId)).collect(Collectors.toList());
StreamMessageMetadata lastMessageMetadata = messages.get(messages.size()
- 1).getMetadata();
assert lastMessageMetadata != null;
offsetOfNextBatch = (KinesisPartitionGroupOffset)
lastMessageMetadata.getNextOffset();
+ } else {
+ // TODO: Revisit whether Kinesis can return empty batch when there are
available records. The consumer cna handle
+ // empty message batch, but it will treat it as fully caught up.
+ messages = List.of();
+ offsetOfNextBatch = startOffset;
}
- return new KinesisMessageBatch(messages, offsetOfNextBatch, endOfShard);
+ assert offsetOfNextBatch != null;
+ _nextStartSequenceNumber = offsetOfNextBatch.getSequenceNumber();
+ _nextShardIterator = getRecordsResponse.nextShardIterator();
+ return new KinesisMessageBatch(messages, offsetOfNextBatch,
_nextShardIterator == null);
}
- private String getShardIterator(String shardId, String sequenceNumber) {
- GetShardIteratorRequest.Builder requestBuilder =
-
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId);
- if (sequenceNumber != null) {
- requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber)
- .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ /**
+ * Kinesis enforces a limit of 5 getRecords request per second on each shard
from AWS end, beyond which we start
+ * getting {@link ProvisionedThroughputExceededException}. Rate limit the
requests to avoid this.
+ */
+ private void rateLimitRequests() {
+ long currentTimeMs = System.currentTimeMillis();
+ int currentTimeSeconds = (int)
TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
+ if (currentTimeSeconds == _currentSecond) {
+ if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
Review Comment:
This can be done later. A log.info or metric would help debug if rate
limiting becomes an issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]