This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8abf99ee35e [Fix](insert-overwrite) Fix insert overwrite auto detect
transaction safe (#38103)
8abf99ee35e is described below
commit 8abf99ee35e709849fd5932824506f354cae39d0
Author: zclllhhjj <[email protected]>
AuthorDate: Thu Jul 25 17:46:55 2024 +0800
[Fix](insert-overwrite) Fix insert overwrite auto detect transaction safe
(#38103)
Before, if insert overwrite auto detect failed because of some
transaction conflicts, it will go into an unexpected situation with some
of the partition replace success but some retains.
Now it will wholly success, or wholly failed.
---
.../java/org/apache/doris/catalog/OlapTable.java | 9 +-
.../insertoverwrite/InsertOverwriteManager.java | 66 ++++++++++---
.../doris/insertoverwrite/InsertOverwriteUtil.java | 1 +
.../insert/InsertOverwriteTableCommand.java | 11 ++-
.../apache/doris/service/FrontendServiceImpl.java | 107 ++++++++++++++-------
.../cloud_p1/conf/regression-conf-custom.groovy | 1 -
.../pipeline/p1/conf/regression-conf.groovy | 1 -
.../test_iot_auto_detect_concurrent.groovy | 11 +--
8 files changed, 144 insertions(+), 63 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index e1e3faaaa9c..916dda773e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1216,10 +1216,13 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return Sets.newHashSet(nameToPartition.keySet());
}
- public List<String> uncheckedGetPartNamesById(List<Long> partitionIds) {
+ // for those elements equal in partiton ids, get their names.
+ public List<String> getEqualPartitionNames(List<Long> partitionIds1,
List<Long> partitionIds2) {
List<String> names = new ArrayList<String>();
- for (Long id : partitionIds) {
- names.add(idToPartition.get(id).getName());
+ for (int i = 0; i < partitionIds1.size(); i++) {
+ if (partitionIds1.get(i).equals(partitionIds2.get(i))) {
+ names.add(getPartition(partitionIds1.get(i)).getName());
+ }
}
return names;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
index e0c46dde920..81524ae0208 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
@@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
// but we only change one time and save the relations in partitionPairs.
they're protected by taskLocks
@SerializedName(value = "taskLocks")
private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
- // <groupId, <oldPartId, newPartId>>
+ // <groupId, <oldPartId, newPartId>>. no need concern which task it
belongs to.
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs =
Maps.newConcurrentMap();
@@ -91,7 +91,7 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
*
* @return group id, like a transaction id.
*/
- public long preRegisterTask() {
+ public long registerTaskGroup() {
long groupId = Env.getCurrentEnv().getNextId();
taskGroups.put(groupId, new ArrayList<Long>());
taskLocks.put(groupId, new ReentrantLock());
@@ -107,44 +107,81 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
taskGroups.get(groupId).add(taskId);
}
- public List<Long> tryReplacePartitionIds(long groupId, List<Long>
oldPartitionIds) {
+ /**
+ * this func should in lock scope of getLock(groupId)
+ *
+ * @param newIds if have replaced, replace with new. otherwise itself.
+ */
+ public boolean tryReplacePartitionIds(long groupId, List<Long>
oldPartitionIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
- List<Long> newIds = new ArrayList<Long>();
- for (Long id : oldPartitionIds) {
+ boolean needReplace = false;
+ for (int i = 0; i < oldPartitionIds.size(); i++) {
+ long id = oldPartitionIds.get(i);
if (relations.containsKey(id)) {
// if we replaced it. then return new one.
newIds.add(relations.get(id));
} else {
- // otherwise itself. we will deal it soon.
newIds.add(id);
+ needReplace = true;
}
}
- return newIds;
+ return needReplace;
}
+ // this func should in lock scope of getLock(groupId)
public void recordPartitionPairs(long groupId, List<Long> oldIds,
List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
Preconditions.checkArgument(oldIds.size() == newIds.size());
for (int i = 0; i < oldIds.size(); i++) {
relations.put(oldIds.get(i), newIds.get(i));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", "
+ newIds.get(i) + "]");
+ }
}
}
+ // lock is a symbol of TaskGroup exist. if not, means already failed.
public ReentrantLock getLock(long groupId) {
return taskLocks.get(groupId);
}
+ // When goes into failure, some BE may still not know and send new request.
+ // it will cause ConcurrentModification or NullPointer.
public void taskGroupFail(long groupId) {
LOG.info("insert overwrite auto detect partition task group [" +
groupId + "] failed");
- for (Long taskId : taskGroups.get(groupId)) {
- taskFail(taskId);
+ ReentrantLock lock = getLock(groupId);
+ lock.lock();
+ try {
+ // will rollback temp partitions in `taskFail`
+ for (Long taskId : taskGroups.get(groupId)) {
+ taskFail(taskId);
+ }
+ cleanTaskGroup(groupId);
+ } finally {
+ lock.unlock();
}
- cleanTaskGroup(groupId);
}
- public void taskGroupSuccess(long groupId) {
+ // here we will make all raplacement of this group visiable. if someone
fails, nothing happen.
+ public void taskGroupSuccess(long groupId, OlapTable targetTable) throws
DdlException {
+ try {
+ Map<Long, Long> relations = partitionPairs.get(groupId);
+ ArrayList<String> oldNames = new ArrayList<>();
+ ArrayList<String> newNames = new ArrayList<>();
+ for (Entry<Long, Long> partitionPair : relations.entrySet()) {
+
oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName());
+
newNames.add(targetTable.getPartition(partitionPair.getValue()).getName());
+ }
+ InsertOverwriteUtil.replacePartition(targetTable, oldNames,
newNames);
+ } catch (Exception e) {
+ LOG.warn("insert overwrite task making replacement failed because
" + e.getMessage()
+ + "all new partition will not be visible and will be
recycled by partition GC.");
+ throw e;
+ }
LOG.info("insert overwrite auto detect partition task group [" +
groupId + "] succeed");
for (Long taskId : taskGroups.get(groupId)) {
+ Env.getCurrentEnv().getEditLog()
+ .logInsertOverwrite(new InsertOverwriteLog(taskId,
tasks.get(taskId), InsertOverwriteOpType.ADD));
taskSuccess(taskId);
}
cleanTaskGroup(groupId);
@@ -164,6 +201,9 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
public void taskFail(long taskId) {
LOG.info("insert overwrite task [" + taskId + "] failed");
boolean rollback = rollback(taskId);
+ if (!rollback) {
+ LOG.warn("roll back task [" + taskId + "] failed");
+ }
if (rollback) {
removeTask(taskId);
} else {
@@ -192,6 +232,7 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
}
}
+ // cancel it. should try to remove them after.
private void cancelTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("cancel insert overwrite task: {}", tasks.get(taskId));
@@ -201,6 +242,7 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
}
}
+ // task and partitions has been removed. it's safe to remove task.
private void removeTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("remove insert overwrite task: {}", tasks.get(taskId));
@@ -222,7 +264,7 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
try {
olapTable = task.getTable();
} catch (DdlException e) {
- LOG.warn("can not get table, task: {}", task);
+ LOG.warn("can not get table, task: {}, reason: {}", task,
e.getMessage());
return true;
}
return InsertOverwriteUtil.dropPartitions(olapTable,
task.getTempPartitionNames());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
index 791ca552e01..7f1595ea59d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
@@ -54,6 +54,7 @@ public class InsertOverwriteUtil {
for (int i = 0; i < partitionNames.size(); i++) {
Env.getCurrentEnv().addPartitionLike((Database)
tableIf.getDatabase(), tableIf.getName(),
new AddPartitionLikeClause(tempPartitionNames.get(i),
partitionNames.get(i), true));
+ LOG.info("successfully add temp partition [{}] for [{}]",
tempPartitionNames.get(i), tableIf.getName());
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index 7df3ea549ea..41568097dad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -169,11 +169,12 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask();
- // When inserting, BE will call to replace partition by
FrontendService. FE do the real
- // add&replacement and return replace result. So there's no
need to do anything else.
+ taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ // When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
+ // partitions and return. for transactional, the replacement
will really occur when insert successed,
+ // i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId);
+
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
taskId = Env.getCurrentEnv().getInsertOverwriteManager()
@@ -184,7 +185,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
}
} catch (Exception e) {
- LOG.warn("insert into overwrite failed");
+ LOG.warn("insert into overwrite failed with task(or group) id " +
taskId);
if (isAutoDetectOverwrite()) {
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4688c9927ca..8bf07e81e27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -287,7 +287,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
// Frontend service used to serve all request for this frontend through
// thrift protocol
@@ -3483,7 +3482,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.info("Receive replace partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
- List<Long> partitionIds = request.getPartitionIds();
+ List<Long> reqPartitionIds = request.getPartitionIds();
long taskGroupId = request.getOverwriteGroupId();
TReplacePartitionResult result = new TReplacePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
@@ -3522,41 +3521,60 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
OlapTable olapTable = (OlapTable) table;
InsertOverwriteManager overwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
ReentrantLock taskLock = overwriteManager.getLock(taskGroupId);
- List<String> allReqPartNames; // all request partitions
+ if (taskLock == null) {
+ errorStatus.setErrorMsgs(Lists
+ .newArrayList(new String("cannot find task group " +
taskGroupId + ", maybe already failed.")));
+ result.setStatus(errorStatus);
+ LOG.warn("send create partition error status: {}", result);
+ return result;
+ }
+
+ ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6]
-> [7 8 5 6]
+ ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending:
[1 2]
+ ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp
partition ids. for [7 8]
+ boolean needReplace = false;
try {
taskLock.lock();
+ // double check lock. maybe taskLock is not null, but has been
removed from the Map. means the task failed.
+ if (overwriteManager.getLock(taskGroupId) == null) {
+ errorStatus.setErrorMsgs(Lists
+ .newArrayList(new String("cannot find task group " +
taskGroupId + ", maybe already failed.")));
+ result.setStatus(errorStatus);
+ LOG.warn("send create partition error status: {}", result);
+ return result;
+ }
+
// we dont lock the table. other thread in this txn will be
controled by taskLock.
- // if we have already replaced. dont do it again, but acquire the
recorded new partition directly.
+ // if we have already replaced, dont do it again, but acquire the
recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
- List<Long> replacedPartIds =
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
- // here if replacedPartIds still have null. this will throw
exception.
- allReqPartNames =
olapTable.uncheckedGetPartNamesById(replacedPartIds);
-
- List<Long> pendingPartitionIds = IntStream.range(0,
partitionIds.size())
- .filter(i -> partitionIds.get(i) ==
replacedPartIds.get(i)) // equal means not replaced
- .mapToObj(partitionIds::get)
- .collect(Collectors.toList());
- // from here we ONLY deal the pending partitions. not include the
dealed(by others).
- if (!pendingPartitionIds.isEmpty()) {
- // below two must have same order inner.
- List<String> pendingPartitionNames =
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
- List<String> tempPartitionNames = InsertOverwriteUtil
- .generateTempPartitionNames(pendingPartitionNames);
+ needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId,
reqPartitionIds, resultPartitionIds);
+ // request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need
replace.
+ if (needReplace) {
+ // names for [1 2]
+ List<String> pendingPartitionNames =
olapTable.getEqualPartitionNames(reqPartitionIds,
+ resultPartitionIds);
+ for (String name : pendingPartitionNames) {
+
pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2]
+ }
- long taskId = overwriteManager.registerTask(dbId, tableId,
tempPartitionNames);
+ // names for [7 8]
+ List<String> newTempNames = InsertOverwriteUtil
+ .generateTempPartitionNames(pendingPartitionNames);
+ // a task means one time insert overwrite
+ long taskId = overwriteManager.registerTask(dbId, tableId,
newTempNames);
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
- InsertOverwriteUtil.addTempPartitions(olapTable,
pendingPartitionNames, tempPartitionNames);
- InsertOverwriteUtil.replacePartition(olapTable,
pendingPartitionNames, tempPartitionNames);
+ InsertOverwriteUtil.addTempPartitions(olapTable,
pendingPartitionNames, newTempNames);
// now temp partitions are bumped up and use new names. we get
their ids and record them.
- List<Long> newPartitionIds = new ArrayList<Long>();
- for (String newPartName : pendingPartitionNames) {
-
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
+ for (String newPartName : newTempNames) {
+
newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8]
}
overwriteManager.recordPartitionPairs(taskGroupId,
pendingPartitionIds, newPartitionIds);
+
if (LOG.isDebugEnabled()) {
LOG.debug("partition replacement: ");
for (int i = 0; i < pendingPartitionIds.size(); i++) {
- LOG.debug("[" + pendingPartitionIds.get(i) + ", " +
newPartitionIds.get(i) + "], ");
+ LOG.debug("[" + pendingPartitionIds.get(i) + " - " +
pendingPartitionNames.get(i) + ", "
+ + newPartitionIds.get(i) + " - " +
newTempNames.get(i) + "], ");
}
}
}
@@ -3569,15 +3587,38 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
taskLock.unlock();
}
- // build partition & tablets. now all partitions in allReqPartNames
are replaced
- // an recorded.
- // so they won't be changed again. if other transaction changing it.
just let it
- // fail.
- List<TOlapTablePartition> partitions = Lists.newArrayList();
- List<TTabletLocation> tablets = Lists.newArrayList();
+ // result: [1 2 5 6], make it [7 8 5 6]
+ int idx = 0;
+ if (needReplace) {
+ for (int i = 0; i < reqPartitionIds.size(); i++) {
+ if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) {
+ resultPartitionIds.set(i, newPartitionIds.get(idx++));
+ }
+ }
+ }
+ if (idx != newPartitionIds.size()) {
+ errorStatus.addToErrorMsgs("changed partition number " + idx + "
is not correct");
+ result.setStatus(errorStatus);
+ LOG.warn("send create partition error status: {}", result);
+ return result;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("replace partition origin ids: ["
+ + String.join(", ",
reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ + ']');
+ LOG.debug("replace partition result ids: ["
+ + String.join(", ",
resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ + ']');
+ }
+
+ // build partition & tablets. now all partitions in allReqPartNames
are replaced an recorded.
+ // so they won't be changed again. if other transaction changing it.
just let it fail.
+ List<TOlapTablePartition> partitions = new ArrayList<>();
+ List<TTabletLocation> tablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- for (String partitionName : allReqPartNames) {
- Partition partition = table.getPartition(partitionName);
+ for (long partitionId : resultPartitionIds) {
+ Partition partition = olapTable.getPartition(partitionId);
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());
diff --git
a/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy
b/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy
index 1b7c4f4c07c..42a18b7f22e 100644
--- a/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p1/conf/regression-conf-custom.groovy
@@ -9,7 +9,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + //
keep this line as th
"test_profile," +
"test_refresh_mtmv," +
"test_spark_load," +
- "test_iot_auto_detect_concurrent," +
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line
excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this
line as the first line
diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy
b/regression-test/pipeline/p1/conf/regression-conf.groovy
index f85892e6834..d4ecd55d38f 100644
--- a/regression-test/pipeline/p1/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p1/conf/regression-conf.groovy
@@ -60,7 +60,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + //
keep this line as th
"test_profile," +
"test_refresh_mtmv," +
"test_spark_load," +
- "test_iot_auto_detect_concurrent," +
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line
// this dir will not be executed
diff --git
a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
index 200dd874df9..e796edfe5bb 100644
---
a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
+++
b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
@@ -16,11 +16,7 @@
// under the License.
suite("test_iot_auto_detect_concurrent") {
- // only nereids now
- sql """set enable_nereids_planner = true"""
- sql """set enable_fallback_to_original_planner = false"""
- sql """set enable_nereids_dml = true"""
-
+ // only nereids now. default for 2.1 and later
def db_name = "test_iot_auto_detect_concurrent"
def table_name = "test_concurrent_write"
@@ -33,9 +29,6 @@ suite("test_iot_auto_detect_concurrent") {
def load_data = { range, offset, expect_success ->
try {
sql " use test_iot_auto_detect_concurrent; "
- sql """set enable_nereids_planner = true"""
- sql """set enable_fallback_to_original_planner = false"""
- sql """set enable_nereids_dml = true"""
sql """ insert overwrite table test_concurrent_write partition(*)
select number*10+${offset} from numbers("number" =
"${range}");
"""
@@ -83,11 +76,13 @@ suite("test_iot_auto_detect_concurrent") {
thread5.join()
// suppose result: success zero or one
if (success_status) { // success zero
+ log.info("test 1: success zero")
result = sql " select count(k0) from test_concurrent_write; "
assertEquals(result[0][0], 1000)
result = sql " select count(distinct k0) from test_concurrent_write; "
assertEquals(result[0][0], 1000)
} else { // success one
+ log.info("test 1: success one")
result = sql " select count(k0) from test_concurrent_write; "
assertEquals(result[0][0], 100)
result = sql " select count(distinct k0) from test_concurrent_write; "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]