Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3226605189
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +630,193 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
- public void splitChunks(List<String> createTbls) throws JobException {
- // todo: When splitting takes a long time, it needs to be changed to
asynchronous.
- if (checkNeedSplitChunks(sourceProperties)) {
- Map<String, List<SnapshotSplit>> tableSplits = new
LinkedHashMap<>();
- for (String tbl : createTbls) {
- List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
- tableSplits.put(tbl, snapshotSplits);
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /**
+ * One-time setup at CREATE.
+ * - initial/snapshot mode: init split progress; scheduler will drive
advanceSplits() each tick.
+ * - latest mode (and other non-splitting modes): open the remote reader
(e.g. PG slot) so the
+ * binlog phase can start immediately; no snapshot splitting will happen.
+ */
+ @Override
+ public void initOnCreate(List<String> syncTables) throws JobException {
+ if (!checkNeedSplitChunks(sourceProperties)) {
+ initSourceReader();
+ return;
+ }
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ if (!checkNeedSplitChunks(sourceProperties)) {
+ return true;
+ }
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC under lock — updateOffset may wait briefly
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
}
- // save chunk list to system table
- saveChunkMeta(tableSplits);
- this.remainingSplits = tableSplits.values().stream()
- .flatMap(List::stream)
+ if (newSplits.size() < batch.size()) {
+ log.info("advanceSplits dedup'd {} duplicate splits (batch={},
new={}) for job {} table {}",
+ batch.size() - newSplits.size(), batch.size(),
newSplits.size(), getJobId(), tbl);
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
.collect(Collectors.toList());
- } else {
- // The source reader is automatically initialized when the split
is obtained.
- // In latest mode, a separate init is required.init source reader
- initSourceReader();
+ try {
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl,
splitsOfTbl);
+ } catch (Exception e) {
+ throw new JobException("UPSERT chunk_list failed for " + tbl +
": " + e.getMessage());
+ }
+
+ // 5. Advance cdcSplitProgress to the last split in the batch.
+ applySplitToProgress(cdcSplitProgress, batch.get(batch.size() -
1));
+ log.info("advanceSplits jobId={} table={} request(nextStart={},
nextSplitId={}) "
Review Comment:
In `advanceSplits`, `applySplitToProgress(cdcSplitProgress, batchLast)` sets
`currentSplittingTable` from `SnapshotSplit.tableId`, but `rpcFetchSplitsBatch`
later passes `currentSplittingTable` back as
`FetchTableSplitsRequest.snapshotTable`. In practice `SnapshotSplit.tableId` is
typically qualified (e.g. `schema.table` / `db.table`), while `snapshotTable`
is expected to be the unqualified table name (the code in cdc_client prefixes
DB/schema again). This will cause the next fetchSplits RPC to target an invalid
table id (double-prefixed) and also breaks `computeCdcRemainingTables()` /
`splitsOfTbl` filtering due to mixed qualified vs unqualified identifiers. Keep
`SplitProgress.currentSplittingTable` in the same namespace as
`syncTables`/`snapshotTable` (likely unqualified), and compare/group splits by
the same normalized key (e.g. extract table name from `split.getTableId()` when
needed) instead of using `split.getTableId()` directly.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +630,193 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
- public void splitChunks(List<String> createTbls) throws JobException {
- // todo: When splitting takes a long time, it needs to be changed to
asynchronous.
- if (checkNeedSplitChunks(sourceProperties)) {
- Map<String, List<SnapshotSplit>> tableSplits = new
LinkedHashMap<>();
- for (String tbl : createTbls) {
- List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
- tableSplits.put(tbl, snapshotSplits);
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /**
+ * One-time setup at CREATE.
+ * - initial/snapshot mode: init split progress; scheduler will drive
advanceSplits() each tick.
+ * - latest mode (and other non-splitting modes): open the remote reader
(e.g. PG slot) so the
+ * binlog phase can start immediately; no snapshot splitting will happen.
+ */
+ @Override
+ public void initOnCreate(List<String> syncTables) throws JobException {
+ if (!checkNeedSplitChunks(sourceProperties)) {
+ initSourceReader();
+ return;
+ }
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ if (!checkNeedSplitChunks(sourceProperties)) {
+ return true;
+ }
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(s.getTableId());
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(s.getTableId());
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+ touched.add(cdcSplitProgress.getCurrentSplittingTable());
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ synchronized (splitsLock) {
+ // 1. Pick next table if not currently splitting one.
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ String tbl = cdcSplitProgress.getCurrentSplittingTable();
+ Object[] startVal = cdcSplitProgress.getNextSplitStart();
+ Integer splitId = cdcSplitProgress.getNextSplitId();
+
+ // 2. RPC under lock — updateOffset may wait briefly
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // 3. mergeBySplitId (defensive dedup).
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
}
- // save chunk list to system table
- saveChunkMeta(tableSplits);
- this.remainingSplits = tableSplits.values().stream()
- .flatMap(List::stream)
+ if (newSplits.size() < batch.size()) {
+ log.info("advanceSplits dedup'd {} duplicate splits (batch={},
new={}) for job {} table {}",
+ batch.size() - newSplits.size(), batch.size(),
newSplits.size(), getJobId(), tbl);
+ }
+ remainingSplits.addAll(newSplits);
+
+ // 4. UPSERT this table's full chunk_list to system table.
+ List<SnapshotSplit> splitsOfTbl = Stream.concat(
+ finishedSplits.stream(), remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
.collect(Collectors.toList());
- } else {
- // The source reader is automatically initialized when the split
is obtained.
- // In latest mode, a separate init is required.init source reader
- initSourceReader();
+ try {
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl,
splitsOfTbl);
+ } catch (Exception e) {
+ throw new JobException("UPSERT chunk_list failed for " + tbl +
": " + e.getMessage());
+ }
Review Comment:
`advanceSplits()` holds `splitsLock` while performing the remote
`rpcFetchSplitsBatch()` call and the system-table `upsertChunkList()` (internal
SQL). Either of these can block for seconds/minutes, which will also block
`updateOffset()`/`getNextOffset()` and can stall task commits or scheduling.
Consider doing the RPC + UPSERT outside the critical section: snapshot the
current progress under lock, release the lock to fetch/UPSERT, then re-acquire
the lock to merge the batch if the progress/table still matches (or abort if it
changed).
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +139,105 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
ChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC builds a fresh splitter from the (table,
nextChunkStart, nextChunkId)
+ * triple supplied by FE, fetches up to {@code batchSize} chunks, then
closes the splitter.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest
modes never reach here.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info(
+ "Get table {} splits for job {} (nextSplitId={},
nextSplitStart={})",
+ ftsReq.getSnapshotTable(),
+ ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ java.util.Arrays.toString(ftsReq.getNextSplitStart()));
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
- remainingSnapshotSplits = new ArrayList<>();
- StreamSplit remainingStreamSplit = null;
-
- // Check startup mode - for PostgreSQL, we use similar logic as MySQL
- String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
- ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
- } else {
- // For non-initial mode, create a stream split
- Offset startingOffset = createInitialOffset();
- remainingStreamSplit =
- new StreamSplit(
- STREAM_SPLIT_ID,
- startingOffset,
- createNoStoppingOffset(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
+ String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+ TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+ int batchSize = ftsReq.getBatchSize() == null ? 100 :
ftsReq.getBatchSize();
+ ChunkSplitterState state = buildChunkSplitterState(sourceConfig,
tableId, ftsReq);
+ ChunkSplitter splitter =
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
+
+ try {
+ splitter.open();
+ List<AbstractSourceSplit> result = new ArrayList<>();
+ while (result.size() < batchSize) {
+
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+ chunks = splitter.generateSplits(tableId);
+ for
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk :
chunks) {
+ result.add(toDorisSnapshotSplit(chunk));
+ }
+ if (!splitter.hasNextChunk()) {
+ break;
+ }
+ }
Review Comment:
Same as MySQL path: the `while (result.size() < batchSize)` loop adds all
elements from `splitter.generateSplits(tableId)` without enforcing the
`batchSize` limit if a single `generateSplits` call returns multiple chunks.
This can produce responses larger than intended. Stop adding once
`result.size()` reaches `batchSize` (or trim) so the RPC stays bounded.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -158,46 +162,131 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
MySqlChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC rebuilds the splitter from (table,
nextSplitStart, nextSplitId)
+ * supplied by FE, splits up to {@code batchSize} chunks, then closes.
Note: evenly-distributed
+ * PKs go through a single splitChunks() call returning all chunks at
once, so batchSize is only
+ * effective on the uneven path.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes go through the chunk path; other
modes return a
+ * single BinlogSplit instead.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info(
+ "Get table {} splits for job {} (nextSplitId={},
nextSplitStart={})",
+ ftsReq.getSnapshotTable(),
+ ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ java.util.Arrays.toString(ftsReq.getNextSplitStart()));
MySqlSourceConfig sourceConfig = getSourceConfig(ftsReq);
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
- List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
- MySqlBinlogSplit remainingBinlogSplit = null;
- if (startupMode.equals(StartupMode.INITIAL) ||
startupMode.equals(StartupMode.SNAPSHOT)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
- } else {
- remainingBinlogSplit =
- new MySqlBinlogSplit(
- BINLOG_SPLIT_ID,
- sourceConfig.getStartupOptions().binlogOffset,
- BinlogOffset.ofNonStopping(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
+
+ if (!startupMode.equals(StartupMode.INITIAL) &&
!startupMode.equals(StartupMode.SNAPSHOT)) {
+ BinlogSplit binlogSplit = new BinlogSplit();
+ binlogSplit.setSplitId(BINLOG_SPLIT_ID);
+
binlogSplit.setStartingOffset(sourceConfig.getStartupOptions().binlogOffset.getOffset());
+ return Collections.singletonList(binlogSplit);
}
- List<AbstractSourceSplit> splits = new ArrayList<>();
- if (!remainingSnapshotSplits.isEmpty()) {
- for (MySqlSnapshotSplit snapshotSplit : remainingSnapshotSplits) {
- String splitId = snapshotSplit.splitId();
- String tableId = snapshotSplit.getTableId().identifier();
- Object[] splitStart = snapshotSplit.getSplitStart();
- Object[] splitEnd = snapshotSplit.getSplitEnd();
- List<String> splitKey =
snapshotSplit.getSplitKeyType().getFieldNames();
- SnapshotSplit split =
- new SnapshotSplit(splitId, tableId, splitKey,
splitStart, splitEnd, null);
- splits.add(split);
+
+ String database =
ftsReq.getConfig().get(DataSourceConfigKeys.DATABASE);
+ TableId tableId = TableId.parse(database + "." +
ftsReq.getSnapshotTable());
+ int batchSize = ftsReq.getBatchSize() == null ? 100 :
ftsReq.getBatchSize();
+
+ boolean isCaseSensitive;
+ try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig)) {
+ isCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to query table id case
sensitivity", e);
+ }
+ MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig,
isCaseSensitive);
+ MySqlPartition partition =
+ new
MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
+
+ ChunkSplitterState state = buildChunkSplitterState(sourceConfig,
tableId, ftsReq, mySqlSchema, partition);
+ MySqlChunkSplitter splitter = new MySqlChunkSplitter(mySqlSchema,
sourceConfig, state);
+
+ try {
+ splitter.open();
+ List<AbstractSourceSplit> result = new ArrayList<>();
+ while (result.size() < batchSize) {
+ List<MySqlSnapshotSplit> chunks =
splitter.splitChunks(partition, tableId);
+ for (MySqlSnapshotSplit chunk : chunks) {
+ result.add(toDorisSnapshotSplit(chunk));
+ }
+ if (!splitter.hasNextChunk()) {
+ break;
+ }
}
Review Comment:
The loop intends to fetch “up to batchSize” splits, but it adds all splits
returned by `splitter.splitChunks(...)` without stopping once `result.size()`
reaches `batchSize`. If `splitChunks()` returns multiple chunks in one call,
this RPC can return more than `batchSize`, undermining the goal of short/
bounded fetchSplits responses. Consider breaking out of the inner loop once
`result.size() == batchSize` (and/or trimming the returned list).
##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -163,21 +169,52 @@ public static void deleteJobMeta(Long jobId) {
}
}
- public static void insertSplitsToMeta(Long jobId, Map<String,
List<SnapshotSplit>> tableSplits) throws Exception {
- List<String> values = new ArrayList<>();
- int index = 1;
- for (Map.Entry<String, List<SnapshotSplit>> entry :
tableSplits.entrySet()) {
- Map<String, String> params = new HashMap<>();
- params.put("id", index + "");
- params.put("job_id", jobId + "");
- params.put("table_name", entry.getKey());
- params.put("chunk_list",
objectMapper.writeValueAsString(entry.getValue()));
- StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
- String sql =
stringSubstitutor.replace(INSERT_INTO_META_TABLE_TEMPLATE);
- values.add(sql);
- index++;
+ /**
+ * UPSERT a single table's chunk_list. id is reused if the table already
has a row,
+ * otherwise allocated as MAX(id)+1. Relies on UNIQUE KEY (id, job_id) for
in-place override.
+ */
+ public static void upsertChunkList(Long jobId, String tableName,
List<SnapshotSplit> chunks) throws Exception {
+ createMetaTableIfNotExist();
+ Integer id = querySingleTableId(jobId, tableName);
+ if (id == null) {
+ id = queryNextAvailableId(jobId);
+ }
+ Map<String, String> params = new HashMap<>();
+ params.put("id", String.valueOf(id));
+ params.put("job_id", String.valueOf(jobId));
+ params.put("table_name", tableName);
+ params.put("chunk_list", objectMapper.writeValueAsString(chunks));
+ StringSubstitutor sub = new StringSubstitutor(params);
+ String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE);
+ batchInsert(Collections.singletonList(sql));
+ }
Review Comment:
`upsertChunkList()` allocates a new row id via `MAX(id)+1` when a table has
no existing row, and relies on `UNIQUE KEY(id, job_id)` for overwrite. This is
not safe under concurrent upserts for the same job: two callers can pick the
same MAX(id) and then one upsert will overwrite the other table’s row (since
table_name is not part of the unique key), corrupting `streaming_job_meta`.
Prefer making `(job_id, table_name)` the unique key for true per-table upsert
semantics, or implement an atomic id allocation strategy that cannot collide
(and never overwrites an unrelated table row).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]